diff --git a/CODEOWNERS b/CODEOWNERS index 0d67557e93f..0a87f268105 100644 --- a/CODEOWNERS +++ b/CODEOWNERS @@ -615,6 +615,7 @@ /include/zephyr/posix/ @pfalcon /include/zephyr/pm/pm.h @nashif @ceolin /include/zephyr/drivers/ptp_clock.h @tbursztyka +/include/zephyr/rtio/ @teburd /include/zephyr/shared_irq.h @dcpleung @nashif @andyross /include/zephyr/shell/ @jakub-uC @nordic-krch /include/zephyr/shell/shell_mqtt.h @ycsin @@ -815,6 +816,7 @@ scripts/gen_image_info.py @tejlmand /tests/subsys/debug/coredump/ @dcpleung /tests/subsys/fs/ @nashif @nvlsianpu @de-nordic /tests/subsys/sd/ @danieldegrasse +/tests/subsys/rtio/ @teburd /tests/subsys/settings/ @nvlsianpu /tests/subsys/shell/ @jakub-uC @nordic-krch # Get all docs reviewed diff --git a/doc/services/index.rst b/doc/services/index.rst index 9335536dc21..2e72bde0c4d 100644 --- a/doc/services/index.rst +++ b/doc/services/index.rst @@ -27,4 +27,5 @@ OS Services tfm/index usb/index.rst virtualization/index.rst + rtio/index.rst misc.rst diff --git a/doc/services/rtio/index.rst b/doc/services/rtio/index.rst new file mode 100644 index 00000000000..cac0e36fbc3 --- /dev/null +++ b/doc/services/rtio/index.rst @@ -0,0 +1,431 @@ +.. _rtio_api: + +Real Time I/O (RTIO) +#################### + +.. contents:: + :local: + :depth: 2 + +.. image:: rings.png + :width: 800 + :alt: Submissions and Completion Ring Queues + +RTIO provides a framework for doing asynchronous operation chains with event +driven I/O. This section covers the RTIO API, queues, executor, iodev, +and common usage patterns with peripheral devices. + +RTIO takes a lot of inspiration from Linux's io_uring in its operations and API +as that API matches up well with hardware DMA transfer queues and descriptions. + +A quick sales pitch on why RTIO works well in many scenarios: + +1. API is DMA and interrupt friendly +2. No buffer copying +3. No callbacks +4. Blocking or non-blocking operation + +Problem +******* + +An application wishing to do complex DMA or interrupt driven operations today +in Zephyr requires direct knowledge of the hardware and how it works. There is +no understanding in the DMA API of other Zephyr devices and how they relate. + +This means doing complex audio, video, or sensor streaming requires direct +hardware knowledge or leaky abstractions over DMA controllers. Neither is ideal. + +To enable asynchronous operations, especially with DMA, a description of what +to do rather than direct operations through C and callbacks is needed. Enabling +DMA features such as channels with priority, and sequences of transfers requires +more than a simple list of descriptions. + +Using DMA and/or interrupt driven I/O shouldn't dictate whether or not the +call is blocking or not. + +Inspiration, introducing io_uring +********************************* + +It's better not to reinvent the wheel (or ring in this case) and io_uring as an +API from the Linux kernel provides a winning model. In io_uring there are two +lock-free ring buffers acting as queues shared between the kernel and a userland +application. One queue for submission entries which may be chained and flushed to +create concurrent sequential requests. A second queue for completion queue events. +Only a single syscall is actually required to execute many operations, the +io_uring_submit call. This call may block the caller when a number of +operations to wait on is given. + +This model maps well to DMA and interrupt driven transfers. A request to do a +sequence of operations in an asynchronous way directly relates +to the way hardware typically works with interrupt driven state machines +potentially involving multiple peripheral IPs like bus and DMA controllers. + +Submission Queue and Chaining +***************************** + +The submission queue (sq), is the description of the operations +to perform in concurrent chains. + +For example imagine a typical SPI transfer where you wish to write a +register address to then read from. So the sequence of operations might be... + + 1. Chip Select + 2. Clock Enable + 3. Write register address into SPI transmit register + 4. Read from the SPI receive register into a buffer + 5. Disable clock + 6. Disable Chip Select + +If anything in this chain of operations fails give up. Some of those operations +can be embodied in a device abstraction that understands a read or write +implicitly means setup the clock and chip select. The transactional nature of +the request also needs to be embodied in some manner. Of the operations above +perhaps the read could be done using DMA as its large enough make sense. That +requires an understanding of how to setup the device's particular DMA to do so. + +The above sequence of operations is embodied in RTIO as chain of +submission queue entries (sqe). Chaining is done by setting a bitflag in +an sqe to signify the next sqe must wait on the current one. + +Because the chip select and clocking is common to a particular SPI controller +and device on the bus it is embodied in what RTIO calls an iodev. + +Multiple operations against the same iodev are done in the order provided as +soon as possible. If two operation chains have varying points using the same +device its possible one chain will have to wait for another to complete. + +Completion Queue +**************** + +In order to know when a sqe has completed there is a completion +queue (cq) with completion queue events (cqe). A sqe once completed results in +a cqe being pushed into the cq. The ordering of cqe may not be the same order of +sqe. A chain of sqe will however ensure ordering and failure cascading. + +Other potential schemes are possible but a completion queue is a well trod +idea with io_uring and other similar operating system APIs. + +Executor and IODev +****************** + +Turning submission queue entries (sqe) into completion queue events (cqe) is the +job of objects implementing the executor and iodev APIs. These APIs enable +coordination between themselves to enable things like DMA transfers. + +The end result of these APIs should be a method to resolve the request by +deciding some of the following questions with heuristic/constraint +based decision making. + +* Polling, Interrupt, or DMA transfer? +* If DMA, are the requirements met (peripheral supported by DMAC, etc). + +The executor is meant to provide policy for when to use each transfer +type, and provide the common code for walking through submission queue +chains by providing calls the iodev may use to signal completion, +error, or a need to suspend and wait. + +Outstanding Questions +********************* + +RTIO is not a complete API and solution, and is currently evolving to best +fit the nature of an RTOS. The general ideas behind a pair of queues to +describe requests and completions seems sound and has been proven out in +other contexts. Questions remain though. + +Timeouts and Deadlines +====================== + +Timeouts and deadlines are key to being Real-Time. Real-Time in Zephyr means +being able to do things when an application wants them done. That could mean +different things from a deadline with best effort attempts or a timeout and +failure. + +These features would surely be useful in many cases, but would likely add some +significant complexities. It's something to decide upon, and even if enabled +would likely be a compile time optional feature leading to complex testing. + +Cancellation +============ + +Canceling an already queued operation could be possible with a small +API addition to perhaps take both the RTIO context and a pointer to the +submission queue entry. However, cancellation as an API induces many potential +complexities that might not be appropriate. It's something to be decided upon. + +Userspace Support +================= + +RTIO with userspace is certainly plausible but would require the equivalent of +a memory map call to map the shared ringbuffers and also potentially dma buffers. + +Additionally a DMA buffer interface would likely need to be provided for +coherence and MMU usage. + +IODev and Executor API +====================== + +Lastly the API between an executor and iodev is incomplete. + +There are certain interactions that should be supported. Perhaps things like +expanding a submission queue entry into multiple submission queue entries in +order to split up work that can be done by a device and work that can be done +by a DMA controller. + +In some SoCs only specific DMA channels may be used with specific devices. In +others there are requirements around needing a DMA handshake or specific +triggering setups to tell the DMA when to start its operation. + +None of that, from the outward facing API, is an issue. + +It is however an unresolved task and issue from an internal API between the +executor and iodev. This requires some SoC specifics and enabling those +generically isn't likely possible. That's ok, an iodev and dma executor should +be vendor specific, but an API needs to be there between them that is not! + + +Special Hardware: Intel HDA +=========================== + +In some cases there's a need to always do things in a specific order +with a specific buffer allocation strategy. Consider a DMA that *requires* +the usage of a circular buffer segmented into blocks that may only be +transferred one after another. This is the case of the Intel HDA stream for +audio. + +In this scenario the above API can still work, but would require an additional +buffer allocator to work with fixed sized segments. + +When to Use +*********** + +It's important to understand when DMA like transfers are useful and when they +are not. It's a poor idea to assume that something made for high throughput will +work for you. There is a computational, memory, and latency cost to setup the +description of transfers. + +Polling at 1Hz an air sensor will almost certainly result in a net negative +result compared to ad-hoc sensor (i2c/spi) requests to get the sample. + +Continuous transfers, driven by timer or interrupt, of data from a peripheral's +on board FIFO over I2C, I3C, SPI, MIPI, I2S, etc... maybe, but not always! + +Examples +******** + +Examples speak loudly about the intended uses and goals of an API. So several key +examples are presented below. Some are entirely plausible today without a +big leap. Others (the sensor example) would require additional work in other +APIs outside of RTIO as a sub system and are theoretical. + +Chained Blocking Requests +========================= + +A common scenario is needing to write the register address to then read from. +This can be accomplished by chaining a write into a read operation. + +The transaction on i2c is implicit for each operation chain. + +.. code-block:: C + + RTIO_I2C_IODEV(i2c_dev, I2C_DT_SPEC_INST(n)); + RTIO_DEFINE(ez_io, 4, 4); + static uint16_t reg_addr; + static uint8_t buf[32]; + + int do_some_io(void) + { + struct rtio_sqe *write_sqe = rtio_spsc_acquire(ez_io.sq); + struct rtio_sqe *read_sqe = rtio_spsc_acquire(ez_io.sq); + + rtio_sqe_prep_write(write_sqe, i2c_dev, RTIO_PRIO_LOW, ®_addr, 2); + write_sqe->flags = RTIO_SQE_CHAINED; /* the next item in the queue will wait on this one */ + + rtio_sqe_prep_read(read_sqe, i2c_dev, RTIO_PRIO_LOW, buf, 32); + + rtio_submit(rtio_inplace_executor, &ez_io, 2); + + struct rtio_cqe *read_cqe = rtio_spsc_consume(ez_io.cq); + struct rtio_cqe *write_cqe = rtio_spsc_consume(ez_io.cq); + + if(read_cqe->result < 0) { + LOG_ERR("read failed!"); + } + + if(write_cqe->result < 0) { + LOG_ERR("write failed!"); + } + + rtio_spsc_release(ez_io.cq); + rtio_spsc_release(ez_io.cq); + } + +Non blocking device to device +============================= + +Imagine wishing to read from one device on an I2C bus and then write the same +buffer to a device on a SPI bus without blocking the thread or setting up +callbacks or other IPC notification mechanisms. + +Perhaps an I2C temperature sensor and a SPI lowrawan module. The following is a +simplified version of that potential operation chain. + +.. code-block:: C + + RTIO_I2C_IODEV(i2c_dev, I2C_DT_SPEC_INST(n)); + RTIO_SPI_IODEV(spi_dev, SPI_DT_SPEC_INST(m)); + + RTIO_DEFINE(ez_io, 4, 4); + static uint8_t buf[32]; + + int do_some_io(void) + { + uint32_t read, write; + struct rtio_sqe *read_sqe = rtio_spsc_acquire(ez_io.sq); + rtio_sqe_prep_read(read_sqe, i2c_dev, RTIO_PRIO_LOW, buf, 32); + read_sqe->flags = RTIO_SQE_CHAINED; /* the next item in the queue will wait on this one */ + + /* Safe to do as the chained operation *ensures* that if one fails all subsequent ops fail */ + struct rtio_sqe *write_sqe = rtio_spsc_acquire(ez_io.sq); + rtio_sqe_prep_write(write_sqe, spi_dev, RTIO_PRIO_LOW, buf, 32); + + /* call will return immediately without blocking if possible */ + rtio_submit(rtio_inplace_executor, &ez_io, 0); + + /* These calls might return NULL if the operations have not yet completed! */ + for (int i = 0; i < 2; i++) { + struct rtio_cqe *cqe = rtio_spsc_consume(ez_io.cq); + while(cqe == NULL) { + cqe = rtio_spsc_consume(ez_io.cq); + k_yield(); + } + if(cqe->userdata == &read && cqe->result < 0) { + LOG_ERR("read from i2c failed!"); + } + if(cqe->userdata == &write && cqe->result < 0) { + LOG_ERR("write to spi failed!"); + } + /* Must release the completion queue event after consume */ + rtio_spsc_release(ez_io.cq); + } + } + +Nested iodevs for Devices on Buses (Sensors), Theoretical +========================================================= + +Consider a device like a sensor or audio codec sitting on a bus. + +Its useful to consider that the sensor driver can use RTIO to do I/O on the SPI +bus, while also being an RTIO device itself. The sensor iodev can set aside a +small portion of the buffer in front or in back to store some metadata describing +the format of the data. This metadata could then be used in creating a sensor +readings iterator which lazily lets you map over each reading, doing +calculations such as FIR/IIR filtering, or perhaps translating the readings into +other numerical formats with useful measurement units such as SI. RTIO is a +common movement API and allows for such uses while not deciding the mechanism. + +This same sort of setup could be done for other data streams such as audio or +video. + +.. code-block:: C + + /* Note that the sensor device itself can use RTIO to get data over I2C/SPI + * potentially with DMA, but we don't need to worry about that here + * All we need to know is the device tree node_id and that it can be an iodev + */ + RTIO_SENSOR_IODEV(sensor_dev, DEVICE_DT_GET(DT_NODE(super6axis)); + + RTIO_DEFINE(ez_io, 4, 4); + + + /* The sensor driver decides the minimum buffer size for us, we decide how + * many bufs. This could be a typical multiple of a fifo packet the sensor + * produces, ICM42688 for example produces a FIFO packet of 20 bytes in + * 20bit mode at 32KHz so perhaps we'd like to get 4 buffers of 4ms of data + * each in this setup to process on. and its already been defined here for us. + */ + #include + static uint8_t bufs[4][ICM42688_RTIO_BUF_SIZE]; + + int do_some_sensors(void) { + /* Obtain a dmac executor from the DMA device */ + struct device *dma = DEVICE_DT_GET(DT_NODE(dma0)); + const struct rtio_executor *rtio_dma_exec = + dma_rtio_executor(dma); + + /* + * Set the executor for our queue context + */ + rtio_set_executor(ez_io, rtio_dma_exec); + + /* Mostly we want to feed the sensor driver enough buffers to fill while + * we wait and process! Small enough to process quickly with low latency, + * big enough to not spend all the time setting transfers up. + * + * It's assumed here that the sensor has been configured already + * and each FIFO watermark interrupt that occurs it attempts + * to pull from the queue, fill the buffer with a small metadata + * offset using its own rtio request to the SPI bus using DMA. + */ + for(int i = 0; i < 4; i++) { + struct rtio_sqe *read_sqe = rtio_spsc_acquire(ez_io.sq); + + rtio_sqe_prep_read(read_sqe, sensor_dev, RTIO_PRIO_HIGH, bufs[i], ICM42688_RTIO_BUF_SIZE); + } + struct device *sensor = DEVICE_DT_GET(DT_NODE(super6axis)); + struct sensor_reader reader; + struct sensor_channels channels[4] = { + SENSOR_TIMESTAMP_CHANNEL, + SENSOR_CHANNEL(int32_t, SENSOR_ACC_X, 0, SENSOR_RAW), + SENSOR_CHANNEL(int32_t SENSOR_ACC_Y, 0, SENSOR_RAW), + SENSOR_CHANNEL(int32_t, SENSOR_ACC_Z, 0, SENSOR_RAW), + }; + while (true) { + /* call will wait for one completion event */ + rtio_submit(ez_io, 1); + struct rtio_cqe *cqe = rtio_spsc_consume(ez_io.cq); + if(cqe->result < 0) { + LOG_ERR("read failed!"); + goto next; + } + + /* Bytes read into the buffer */ + int32_t bytes_read = cqe->result; + + /* Retrieve soon to be reusable buffer pointer from completion */ + uint8_t *buf = cqe->userdata; + + + /* Get an iterator (reader) that obtains sensor readings in integer + * form, 16 bit signed values in the native sensor reading format + */ + res = sensor_reader(sensor, buf, cqe->result, &reader, channels, + sizeof(channels)); + __ASSERT(res == 0); + while(sensor_reader_next(&reader)) { + printf("time(raw): %d, acc (x,y,z): (%d, %d, %d)\n", + channels[0].value.u32, channels[1].value.i32, + channels[2].value.i32, channels[3].value.i32); + } + + next: + /* Release completion queue event */ + rtio_spsc_release(ez_io.cq); + + /* resubmit a read request with the newly freed buffer to the sensor */ + struct rtio_sqe *read_sqe = rtio_spsc_acquire(ez_io.sq); + rtio_sqe_prep_read(read_sqe, sensor_dev, RTIO_PRIO_HIGH, buf, ICM20649_RTIO_BUF_SIZE); + } + } + +API Reference +************* + +RTIO API +======== + +.. doxygengroup:: rtio_api + +RTIO SPSC API +============= + +.. doxygengroup:: rtio_spsc diff --git a/doc/services/rtio/rings.png b/doc/services/rtio/rings.png new file mode 100644 index 00000000000..cd0f9ab0415 Binary files /dev/null and b/doc/services/rtio/rings.png differ diff --git a/include/zephyr/rtio/rtio.h b/include/zephyr/rtio/rtio.h new file mode 100644 index 00000000000..fdff8d1c155 --- /dev/null +++ b/include/zephyr/rtio/rtio.h @@ -0,0 +1,627 @@ +/* + * Copyright (c) 2022 Intel Corporation + * + * SPDX-License-Identifier: Apache-2.0 + */ + +/** + * @file + * @brief Real-Time IO device API for moving bytes with low effort + * + * RTIO uses a SPSC lock-free queue pair to enable a DMA and ISR friendly I/O API. + * + * I/O like operations are setup in a pre-allocated queue with a fixed number of + * submission requests. Each submission request takes the device to operate on + * and an operation. The rest is information needed to perform the operation such + * as a register or mmio address of the device, a source/destination buffers + * to read from or write to, and other pertinent information. + * + * These operations may be chained in a such a way that only when the current + * operation is complete will the next be executed. If the current request fails + * all chained requests will also fail. + * + * The completion of these requests are pushed into a fixed size completion + * queue which an application may actively poll for completions. + * + * An executor (could use a dma controller!) takes the queues and determines + * how to perform each requested operation. By default there is a software + * executor which does all operations in software using software device + * APIs. + */ + +#ifndef ZEPHYR_INCLUDE_RTIO_RTIO_H_ +#define ZEPHYR_INCLUDE_RTIO_RTIO_H_ + +#include +#include +#include +#include +#include + +#ifdef __cplusplus +extern "C" { +#endif + + +/** + * @brief RTIO + * @defgroup rtio RTIO + * @{ + * @} + */ + + +struct rtio_iodev; + +/** + * @brief RTIO API + * @defgroup rtio_api RTIO API + * @ingroup rtio + * @{ + */ + +/** + * @brief RTIO Predefined Priorties + * @defgroup rtio_sqe_prio RTIO Priorities + * @ingroup rtio_api + * @{ + */ + +/** + * @brief Low priority + */ +#define RTIO_PRIO_LOW 0U + +/** + * @brief Normal priority + */ +#define RTIO_PRIO_NORM 127U + +/** + * @brief High priority + */ +#define RTIO_PRIO_HIGH 255U + +/** + * @} + */ + + +/** + * @brief RTIO SQE Flags + * @defgroup rtio_sqe_flags RTIO SQE Flags + * @ingroup rtio_api + * @{ + */ + +/** + * @brief The next request in the queue should wait on this one. + */ +#define RTIO_SQE_CHAINED BIT(0) + +/** + * @} + */ + +/** + * @brief A submission queue event + */ +struct rtio_sqe { + uint8_t op; /**< Op code */ + + uint8_t prio; /**< Op priority */ + + uint16_t flags; /**< Op Flags */ + + struct rtio_iodev *iodev; /**< Device to operation on */ + + /** + * User provided pointer to data which is returned upon operation + * completion + * + * If unique identification of completions is desired this should be + * unique as well. + */ + void *userdata; + + union { + struct { + uint32_t buf_len; /**< Length of buffer */ + + uint8_t *buf; /**< Buffer to use*/ + }; + }; +}; + +/** + * @brief Submission queue + * + * This is used for typifying the members of an RTIO queue pair + * but nothing more. + */ +struct rtio_sq { + struct rtio_spsc _spsc; + struct rtio_sqe buffer[]; +}; + +/** + * @brief A completion queue event + */ +struct rtio_cqe { + int32_t result; /**< Result from operation */ + void *userdata; /**< Associated userdata with operation */ +}; + +/** + * @brief Completion queue + * + * This is used for typifying the members of an RTIO queue pair + * but nothing more. + */ +struct rtio_cq { + struct rtio_spsc _spsc; + struct rtio_cqe buffer[]; +}; + +struct rtio; + +struct rtio_executor_api { + /** + * @brief Submit the request queue to executor + * + * The executor is responsible for interpreting the submission queue and + * creating concurrent execution chains. + * + * Concurrency is optional and implementation dependent. + */ + int (*submit)(struct rtio *r); + + /** + * @brief SQE completes successfully + */ + void (*ok)(struct rtio *r, const struct rtio_sqe *sqe, int result); + + /** + * @brief SQE fails to complete + */ + void (*err)(struct rtio *r, const struct rtio_sqe *sqe, int result); +}; + +/** + * @brief An executor does the work of executing the submissions. + * + * This could be a DMA controller backed executor, thread backed, + * or simple in place executor. + * + * A DMA executor might schedule all transfers with priorities + * and use hardware arbitration. + * + * A threaded executor might use a thread pool where each transfer + * chain is executed across the thread pool and the priority of the + * transfer is used as the thread priority. + * + * A simple in place exector might simply loop over and execute each + * transfer in the calling threads context. Priority is entirely + * derived from the calling thread then. + * + * An implementation of the executor must place this struct as + * its first member such that pointer aliasing works. + */ +struct rtio_executor { + const struct rtio_executor_api *api; +}; + +/** + * @brief An RTIO queue pair that both the kernel and application work with + * + * The kernel is the consumer of the submission queue, and producer of the completion queue. + * The application is the consumer of the completion queue and producer of the submission queue. + * + * Nothing is done until a call is performed to do the work (rtio_execute). + */ +struct rtio { + + /* + * An executor which does the job of working through the submission + * queue. + */ + struct rtio_executor *executor; + + +#ifdef CONFIG_RTIO_SUBMIT_SEM + /* A wait semaphore which may suspend the calling thread + * to wait for some number of completions when calling submit + */ + struct k_sem *submit_sem; + + uint32_t submit_count; +#endif + +#ifdef CONFIG_RTIO_CONSUME_SEM + /* A wait semaphore which may suspend the calling thread + * to wait for some number of completions while consuming + * them from the completion queue + */ + struct k_sem *consume_sem; +#endif + + /* Number of completions that were unable to be submitted with results + * due to the cq spsc being full + */ + atomic_t xcqcnt; + + /* Submission queue */ + struct rtio_sq *sq; + + /* Completion queue */ + struct rtio_cq *cq; +}; + +/** + * @brief API that an RTIO IO device should implement + */ +struct rtio_iodev_api { + /** + * @brief Submission function for a request to the iodev + * + * The iodev is responsible for doing the operation described + * as a submission queue entry and reporting results using using + * `rtio_sqe_ok` or `rtio_sqe_err` once done. + */ + void (*submit)(const struct rtio_sqe *sqe, + struct rtio *r); + + /** + * TODO some form of transactional piece is missing here + * where we wish to "transact" on an iodev with multiple requests + * over a chain. + * + * Only once explicitly released or the chain fails do we want + * to release. Once released any pending iodevs in the queue + * should be done. + * + * Something akin to a lock/unlock pair. + */ +}; + +/* IO device submission queue entry */ +struct rtio_iodev_sqe { + const struct rtio_sqe *sqe; + struct rtio *r; +}; + +/** + * @brief IO device submission queue + * + * This is used for reifying the member of the rtio_iodev struct + */ +struct rtio_iodev_sq { + struct rtio_spsc _spsc; + struct rtio_iodev_sqe buffer[]; +}; + +/** + * @brief An IO device with a function table for submitting requests + * + * This is required to be the first member of every iodev. There's a strong + * possibility this will be extended with some common data fields (statistics) + * in the future. + */ +struct rtio_iodev { + /* Function pointer table */ + const struct rtio_iodev_api *api; + + /* Queue of RTIO contexts with requests */ + struct rtio_iodev_sq *iodev_sq; +}; + + +/** An operation that does nothing and will complete immediately */ +#define RTIO_OP_NOP 0 + +/** An operation that receives (reads) */ +#define RTIO_OP_RX 1 + +/** An operation that transmits (writes) */ +#define RTIO_OP_TX 2 + +/** + * @brief Prepare a nop (no op) submission + */ +static inline void rtio_sqe_prep_nop(struct rtio_sqe *sqe, + struct rtio_iodev *iodev, + void *userdata) +{ + sqe->op = RTIO_OP_NOP; + sqe->iodev = iodev; + sqe->userdata = userdata; +} + +/** + * @brief Prepare a read op submission + */ +static inline void rtio_sqe_prep_read(struct rtio_sqe *sqe, + struct rtio_iodev *iodev, + int8_t prio, + uint8_t *buf, + uint32_t len, + void *userdata) +{ + sqe->op = RTIO_OP_RX; + sqe->prio = prio; + sqe->iodev = iodev; + sqe->buf_len = len; + sqe->buf = buf; + sqe->userdata = userdata; +} + +/** + * @brief Prepare a write op submission + */ +static inline void rtio_sqe_prep_write(struct rtio_sqe *sqe, + struct rtio_iodev *iodev, + int8_t prio, + uint8_t *buf, + uint32_t len, + void *userdata) +{ + sqe->op = RTIO_OP_TX; + sqe->prio = prio; + sqe->iodev = iodev; + sqe->buf_len = len; + sqe->buf = buf; + sqe->userdata = userdata; +} + +/** + * @brief Statically define and initialize a fixed length submission queue. + * + * @param name Name of the submission queue. + * @param len Queue length, power of 2 required (2, 4, 8). + */ +#define RTIO_SQ_DEFINE(name, len) \ + static RTIO_SPSC_DEFINE(name, struct rtio_sqe, len) + +/** + * @brief Statically define and initialize a fixed length completion queue. + * + * @param name Name of the completion queue. + * @param len Queue length, power of 2 required (2, 4, 8). + */ +#define RTIO_CQ_DEFINE(name, len) \ + static RTIO_SPSC_DEFINE(name, struct rtio_cqe, len) + +/** + * @brief Statically define and initialize an RTIO context + * + * @param name Name of the RTIO + * @param exec Symbol for rtio_executor (pointer) + * @param sq_sz Size of the submission queue, must be power of 2 + * @param cq_sz Size of the completion queue, must be power of 2 + */ +#define RTIO_DEFINE(name, exec, sq_sz, cq_sz) \ + IF_ENABLED(CONFIG_RTIO_SUBMIT_SEM, (K_SEM_DEFINE(_submit_sem_##name, 0, K_SEM_MAX_LIMIT))) \ + IF_ENABLED(CONFIG_RTIO_CONSUME_SEM, (K_SEM_DEFINE(_consume_sem_##name, 0, 1))) \ + RTIO_SQ_DEFINE(_sq_##name, sq_sz); \ + RTIO_CQ_DEFINE(_cq_##name, cq_sz); \ + static struct rtio name = { \ + .executor = (exec), \ + .xcqcnt = ATOMIC_INIT(0), \ + IF_ENABLED(CONFIG_RTIO_SUBMIT_SEM, (.submit_sem = &_submit_sem_##name,)) \ + IF_ENABLED(CONFIG_RTIO_SUBMIT_SEM, (.submit_count = 0,)) \ + IF_ENABLED(CONFIG_RTIO_CONSUME_SEM, (.consume_sem = &_consume_sem_##name,)) \ + .sq = (struct rtio_sq *const)&_sq_##name, \ + .cq = (struct rtio_cq *const)&_cq_##name, \ + } + +/** + * @brief Set the executor of the rtio context + */ +static inline void rtio_set_executor(struct rtio *r, struct rtio_executor *exc) +{ + r->executor = exc; +} + +/** + * @brief Perform a submitted operation with an iodev + * + * @param sqe Submission to work on + * @param r RTIO context + */ +static inline void rtio_iodev_submit(const struct rtio_sqe *sqe, struct rtio *r) +{ + sqe->iodev->api->submit(sqe, r); +} + +/** + * @brief Submit I/O requests to the underlying executor + * + * Submits the queue of requested IO operation chains to + * the underlying executor. The underlying executor will decide + * on which hardware and with what sort of parallelism the execution + * of IO chains is performed. + * + * @param r RTIO context + * @param wait_count Count of completions to wait for + * If wait_count is for completions flag is set, the call will not + * return until the desired number of completions are done. A wait count of + * non-zero requires the caller be on a thread. + * + * @retval 0 On success + */ +static inline int rtio_submit(struct rtio *r, uint32_t wait_count) +{ + int res; + + __ASSERT(r->executor != NULL, "expected rtio submit context to have an executor"); + +#ifdef CONFIG_RTIO_SUBMIT_SEM + /* TODO undefined behavior if another thread calls submit of course + */ + if (wait_count > 0) { + __ASSERT(!k_is_in_isr(), + "expected rtio submit with wait count to be called from a thread"); + + k_sem_reset(r->submit_sem); + r->submit_count = wait_count; + } +#endif + + /* Enqueue all prepared submissions */ + rtio_spsc_produce_all(r->sq); + + /* Submit the queue to the executor which consumes submissions + * and produces completions through ISR chains or other means. + */ + res = r->executor->api->submit(r); + if (res != 0) { + return res; + } + + /* TODO could be nicer if we could suspend the thread and not + * wake up on each completion here. + */ +#ifdef CONFIG_RTIO_SUBMIT_SEM + + if (wait_count > 0) { + res = k_sem_take(r->submit_sem, K_FOREVER); + __ASSERT(res == 0, + "semaphore was reset or timed out while waiting on completions!"); + } +#else + while (rtio_spsc_consumable(r->cq) < wait_count) { +#ifdef CONFIG_BOARD_NATIVE_POSIX + k_busy_wait(1); +#else + k_yield(); +#endif /* CONFIG_BOARD_NATIVE_POSIX */ + } +#endif + + return res; +} + +/** + * @brief Consume a single completion queue event if available + * + * If a completion queue event is returned rtio_cq_release(r) must be called + * at some point to release the cqe spot for the cqe producer. + * + * @param r RTIO context + * + * @retval cqe A valid completion queue event consumed from the completion queue + * @retval NULL No completion queue event available + */ +static inline struct rtio_cqe *rtio_cqe_consume(struct rtio *r) +{ + return rtio_spsc_consume(r->cq); +} + +/** + * @brief Wait for and consume a single completion queue event + * + * If a completion queue event is returned rtio_cq_release(r) must be called + * at some point to release the cqe spot for the cqe producer. + * + * @param r RTIO context + * + * @retval cqe A valid completion queue event consumed from the completion queue + */ +static inline struct rtio_cqe *rtio_cqe_consume_block(struct rtio *r) +{ + struct rtio_cqe *cqe; + + /* TODO is there a better way? reset this in submit? */ +#ifdef CONFIG_RTIO_CONSUME_SEM + k_sem_reset(r->consume_sem); +#endif + cqe = rtio_spsc_consume(r->cq); + + while (cqe == NULL) { + cqe = rtio_spsc_consume(r->cq); + +#ifdef CONFIG_RTIO_CONSUME_SEM + k_sem_take(r->consume_sem, K_FOREVER); +#else + k_yield(); +#endif + } + + return cqe; +} + +/** + * @brief Inform the executor of a submission completion with success + * + * This may start the next asynchronous request if one is available. + * + * @param r RTIO context + * @param sqe Submission that has succeeded + * @param result Result of the request + */ +static inline void rtio_sqe_ok(struct rtio *r, const struct rtio_sqe *sqe, int result) +{ + r->executor->api->ok(r, sqe, result); +} + +/** + * @brief Inform the executor of a submissions completion with error + * + * This SHALL fail the remaining submissions in the chain. + * + * @param r RTIO context + * @param sqe Submission that has failed + * @param result Result of the request + */ +static inline void rtio_sqe_err(struct rtio *r, const struct rtio_sqe *sqe, int result) +{ + r->executor->api->err(r, sqe, result); +} + +/** + * Submit a completion queue event with a given result and userdata + * + * Called by the executor to produce a completion queue event, no inherent + * locking is performed and this is not safe to do from multiple callers. + * + * @param r RTIO context + * @param result Integer result code (could be -errno) + * @param userdata Userdata to pass along to completion + */ +static inline void rtio_cqe_submit(struct rtio *r, int result, void *userdata) +{ + struct rtio_cqe *cqe = rtio_spsc_acquire(r->cq); + + if (cqe == NULL) { + atomic_inc(&r->xcqcnt); + } else { + cqe->result = result; + cqe->userdata = userdata; + rtio_spsc_produce(r->cq); + } +#ifdef CONFIG_RTIO_SUBMIT_SEM + if (r->submit_count > 0) { + r->submit_count--; + if (r->submit_count == 0) { + k_sem_give(r->submit_sem); + } + } +#endif +#ifdef CONFIG_RTIO_CONSUME_SEM + k_sem_give(r->consume_sem); +#endif +} + +/* TODO add rtio_sqe_suspend() for suspending a submission chain that must + * wait on other in progress submissions or submission chains. + */ + +/** + * @} + */ + +#ifdef __cplusplus +} +#endif + +#endif /* ZEPHYR_INCLUDE_RTIO_RTIO_H_ */ diff --git a/include/zephyr/rtio/rtio_executor_simple.h b/include/zephyr/rtio/rtio_executor_simple.h new file mode 100644 index 00000000000..d39a64f46ae --- /dev/null +++ b/include/zephyr/rtio/rtio_executor_simple.h @@ -0,0 +1,92 @@ +/* + * Copyright (c) 2022 Intel Corporation. + * + * SPDX-License-Identifier: Apache-2.0 + */ +#ifndef ZEPHYR_INCLUDE_RTIO_RTIO_EXECUTOR_SIMPLE_H_ +#define ZEPHYR_INCLUDE_RTIO_RTIO_EXECUTOR_SIMPLE_H_ + +#include + +#ifdef __cplusplus +extern "C" { +#endif + +/** + * @brief RTIO Simple Executor + * + * Provides the simplest possible executor without any concurrency + * or reinterpretation of requests. + * + * @defgroup rtio_executor_simple RTIO Simple Executor + * @ingroup rtio + * @{ + */ + + +/** + * @brief Submit to the simple executor + * + * @param r RTIO context to submit + * + * @retval 0 always succeeds + */ +int rtio_simple_submit(struct rtio *r); + +/** + * @brief Report a SQE has completed successfully + * + * @param r RTIO context to use + * @param sqe RTIO SQE to report success + * @param result Result of the SQE + */ +void rtio_simple_ok(struct rtio *r, const struct rtio_sqe *sqe, int result); + +/** + * @brief Report a SQE has completed with error + * + * @param r RTIO context to use + * @param sqe RTIO SQE to report success + * @param result Result of the SQE + */ +void rtio_simple_err(struct rtio *r, const struct rtio_sqe *sqe, int result); + +/** + * @brief Simple Executor + */ +struct rtio_simple_executor { + struct rtio_executor ctx; +}; + +/** + * @cond INTERNAL_HIDDEN + */ +static const struct rtio_executor_api z_rtio_simple_api = { + .submit = rtio_simple_submit, + .ok = rtio_simple_ok, + .err = rtio_simple_err +}; + +/** + * @endcond INTERNAL_HIDDEN + */ + + +/** + * @brief Define a simple executor with a given name + * + * @param name Symbol name, must be unique in the context in which its used + */ +#define RTIO_EXECUTOR_SIMPLE_DEFINE(name) \ + struct rtio_simple_executor name = { .ctx = { .api = &z_rtio_simple_api } }; + +/** + * @} + */ + +#ifdef __cplusplus +} +#endif + + +#endif /* ZEPHYR_INCLUDE_RTIO_RTIO_EXECUTOR_SIMPLE_H_ */ diff --git a/include/zephyr/rtio/rtio_spsc.h b/include/zephyr/rtio/rtio_spsc.h new file mode 100644 index 00000000000..8714aec36dc --- /dev/null +++ b/include/zephyr/rtio/rtio_spsc.h @@ -0,0 +1,258 @@ +/* + * Copyright (c) 2022 Intel Corporation + * + * SPDX-License-Identifier: Apache-2.0 + */ + + +#ifndef ZEPHYR_RTIO_SPSC_H_ +#define ZEPHYR_RTIO_SPSC_H_ + +#include +#include +#include + +/** + * @brief RTIO Single Producer Single Consumer (SPSC) Queue API + * @defgroup rtio_spsc RTIO SPSC API + * @ingroup rtio + * @{ + */ + +/** + * @file rtio_spsc.h + * + * @brief A lock-free and type safe power of 2 fixed sized single producer + * single consumer (SPSC) queue using a ringbuffer and atomics to ensure + * coherency. + * + * This SPSC queue implementation works on an array which wraps using a power of + * two size and uses a bit mask to perform a modulus. Atomics are used to allow + * single-producer single-consumer safe semantics without locks. Elements are + * expected to be of a fixed size. The API is type safe as the underlying buffer + * is typed and all usage is done through macros. + * + * An SPSC queue may be declared on a stack or statically and work as intended so + * long as its lifetime outlives any usage. Static declarations should be the + * preferred method as stack . It is meant to be a shared object between two + * execution contexts (ISR and a thread for example) + * + * An SPSC queue is safe to produce or consume in an ISR with O(1) push/pull. + * + * @warning SPSC is *not* safe to produce or consume in multiple execution + * contexts. + * + * Safe usage would be, where A and B are unique execution contexts: + * 1. ISR A producing and a Thread B consuming. + * 2. Thread A producing and ISR B consuming. + * 3. Thread A producing and Thread B consuming. + * 4. ISR A producing and ISR B consuming. + */ + +/** + * @private + * @brief Common SPSC attributes + * + * @warning Not to be manipulated without the macros! + */ +struct rtio_spsc { + /* private value only the producer thread should mutate */ + unsigned long acquire; + + /* private value only the consumer thread should mutate */ + unsigned long consume; + + /* producer mutable, consumer readable */ + atomic_t in; + + /* consumer mutable, producer readable */ + atomic_t out; + + /* mask used to automatically wrap values */ + const unsigned long mask; +}; + +/** + * @brief Statically initialize an rtio_spsc + * + * @param name Name of the spsc symbol to be provided + * @param type Type stored in the spsc + * @param sz Size of the spsc, must be power of 2 (ex: 2, 4, 8) + */ +#define RTIO_SPSC_INITIALIZER(name, type, sz) \ + { ._spsc = { \ + .acquire = 0, \ + .consume = 0, \ + .in = ATOMIC_INIT(0), \ + .out = ATOMIC_INIT(0), \ + .mask = sz - 1, \ + } \ + } + +/** + * @brief Declare an anonymous struct type for an rtio_spsc + * + * @param name Name of the spsc symbol to be provided + * @param type Type stored in the spsc + * @param sz Size of the spsc, must be power of 2 (ex: 2, 4, 8) + */ +#define RTIO_SPSC_DECLARE(name, type, sz) \ + struct rtio_spsc_ ## name { \ + struct rtio_spsc _spsc; \ + type buffer[sz]; \ + } + +/** + * @brief Define an rtio_spsc with a fixed size + * + * @param name Name of the spsc symbol to be provided + * @param type Type stored in the spsc + * @param sz Size of the spsc, must be power of 2 (ex: 2, 4, 8) + */ +#define RTIO_SPSC_DEFINE(name, type, sz) \ + RTIO_SPSC_DECLARE(name, type, sz) name = \ + RTIO_SPSC_INITIALIZER(name, type, sz); + +/** + * @brief Size of the SPSC queue + * + * @param spsc SPSC reference + */ +#define rtio_spsc_size(spsc) ((spsc)->_spsc.mask + 1) + +/** + * @private + * @brief A number modulo the spsc size, assumes power of 2 + * + * @param spsc SPSC reference + * @param i Value to modulo to the size of the spsc + */ +#define z_rtio_spsc_mask(spsc, i) (i & (spsc)->_spsc.mask) + +/** + * @brief Initialize/reset a spsc such that its empty + * + * Note that this is not safe to do while being used in a producer/consumer + * situation with multiple calling contexts (isrs/threads). + * + * @param spsc SPSC to initialize/reset + */ +#define rtio_spsc_reset(spsc) \ + ({ \ + (spsc)->_spsc.consume = 0; \ + (spsc)->_spsc.acquire = 0; \ + atomic_set(&(spsc)->_spsc.in, 0); \ + atomic_set(&(spsc)->_spsc.out, 0); \ + }) + +/** + * @brief Acquire an element to produce from the SPSC + * + * @param spsc SPSC to acquire an element from for producing + * + * @return A pointer to the acquired element or null if the spsc is full + */ +#define rtio_spsc_acquire(spsc) \ + ({ \ + uint32_t idx = (uint32_t)atomic_get(&(spsc)->_spsc.in) + (spsc)->_spsc.acquire; \ + bool acq = \ + (idx - (uint32_t)atomic_get(&(spsc)->_spsc.out)) < rtio_spsc_size(spsc); \ + if (acq) { \ + (spsc)->_spsc.acquire += 1; \ + } \ + acq ? &((spsc)->buffer[z_rtio_spsc_mask(spsc, idx)]) : NULL; \ + }) + +/** + * @brief Produce one previously acquired element to the SPSC + * + * This makes one element available to the consumer immediately + * + * @param spsc SPSC to produce the previously acquired element or do nothing + */ +#define rtio_spsc_produce(spsc) \ + ({ \ + if ((spsc)->_spsc.acquire > 0) { \ + (spsc)->_spsc.acquire -= 1; \ + atomic_add(&(spsc)->_spsc.in, 1); \ + } \ + }) + +/** + * @brief Produce all previously acquired elements to the SPSC + * + * This makes all previous acquired elements available to the consumer + * immediately + * + * @param spsc SPSC to produce all previously acquired elements or do nothing + */ +#define rtio_spsc_produce_all(spsc) \ + ({ \ + if ((spsc)->_spsc.acquire > 0) { \ + unsigned long acquired = (spsc)->_spsc.acquire; \ + (spsc)->_spsc.acquire = 0; \ + atomic_add(&(spsc)->_spsc.in, acquired); \ + } \ + }) + +/** + * @brief Peek at an element from the spsc + * + * @param spsc Spsc to peek into + * + * @return Pointer to element or null if no consumable elements left + */ +#define rtio_spsc_peek(spsc) \ + ({ \ + uint32_t idx = (uint32_t)atomic_get(&(spsc)->_spsc.out) + (spsc)->_spsc.consume; \ + bool has_consumable = (idx != (uint32_t)atomic_get(&(spsc)->_spsc.in)); \ + has_consumable ? &((spsc)->buffer[z_rtio_spsc_mask(spsc, idx)]) : NULL; \ + }) + +/** + * @brief Consume an element from the spsc + * + * @param spsc Spsc to consume from + * + * @return Pointer to element or null if no consumable elements left + */ +#define rtio_spsc_consume(spsc) \ + ({ \ + uint32_t idx = (uint32_t)atomic_get(&(spsc)->_spsc.out) + (spsc)->_spsc.consume; \ + bool has_consumable = (idx != (uint32_t)atomic_get(&(spsc)->_spsc.in)); \ + if (has_consumable) { \ + (spsc)->_spsc.consume += 1; \ + } \ + has_consumable ? &((spsc)->buffer[z_rtio_spsc_mask(spsc, idx)]) : NULL; \ + }) + +/** + * @brief Release a consumed element + * + * @param spsc SPSC to release consumed element or do nothing + */ +#define rtio_spsc_release(spsc) \ + ({ \ + if ((spsc)->_spsc.consume > 0) { \ + (spsc)->_spsc.consume -= 1; \ + atomic_add(&(spsc)->_spsc.out, 1); \ + } \ + }) + +/** + * @brief Count of consumables in spsc + * + * @param spsc SPSC to get item count for + */ +#define rtio_spsc_consumable(spsc) \ + ({ \ + (spsc)->_spsc.in \ + - (spsc)->_spsc.out \ + - (spsc)->_spsc.consume; \ + }) \ + +/** + * @} + */ + +#endif /* ZEPHYR_RTIO_SPSC_H_ */ diff --git a/subsys/CMakeLists.txt b/subsys/CMakeLists.txt index b468f018bb6..4318f3136e5 100644 --- a/subsys/CMakeLists.txt +++ b/subsys/CMakeLists.txt @@ -31,3 +31,4 @@ add_subdirectory_ifdef(CONFIG_TIMING_FUNCTIONS timing) add_subdirectory_ifdef(CONFIG_DEMAND_PAGING demand_paging) add_subdirectory(modbus) add_subdirectory(sd) +add_subdirectory(rtio) diff --git a/subsys/Kconfig b/subsys/Kconfig index d8585aa8376..3f6830fa434 100644 --- a/subsys/Kconfig +++ b/subsys/Kconfig @@ -68,4 +68,6 @@ source "subsys/tracing/Kconfig" source "subsys/demand_paging/Kconfig" +source "subsys/rtio/Kconfig" + endmenu diff --git a/subsys/rtio/CMakeLists.txt b/subsys/rtio/CMakeLists.txt new file mode 100644 index 00000000000..d2732c1ecb5 --- /dev/null +++ b/subsys/rtio/CMakeLists.txt @@ -0,0 +1,14 @@ +# Copyright (c) 2022 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +if(CONFIG_RTIO) + zephyr_library() + + zephyr_include_directories(${ZEPHYR_BASE}/subsys/rtio) + + zephyr_library_sources_ifdef( + CONFIG_RTIO_EXECUTOR_SIMPLE + rtio_executor_simple.c + ) + +endif() diff --git a/subsys/rtio/Kconfig b/subsys/rtio/Kconfig new file mode 100644 index 00000000000..7795128e411 --- /dev/null +++ b/subsys/rtio/Kconfig @@ -0,0 +1,39 @@ +# Copyright (c) 2022 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +menuconfig RTIO + bool "RTIO" + +if RTIO + +config RTIO_EXECUTOR_SIMPLE + bool "A simple executor for RTIO" + default y + help + An simple RTIO executor that will execute a queue of requested I/O + operations as if they are a single chain of submission queue entries. This + does not support concurrent chains. + +config RTIO_SUBMIT_SEM + bool "Use a semaphore when waiting for completions in rtio_submit" + help + When calling rtio_submit a semaphore is available to sleep the calling + thread for each completion queue event until the wait count is met. This + adds a small RAM overhead for a single semaphore. By default wait_for will + use polling on the completion queue with a k_yield() in between iterations. + +config RTIO_CONSUME_SEM + bool "Use a semaphore when waiting for completions in rtio_cqe_consume_block" + help + When calling rtio_cqe_consume_block a semaphore is available to sleep the + calling thread for each completion queue event until the wait count is met. + This adds a small RAM overhead for a single semaphore. By default the call + will use polling on the completion queue with a k_yield() in between + iterations. + +module = RTIO +module-str = RTIO +module-help = Sets log level for RTIO support +source "subsys/logging/Kconfig.template.log_config" + +endif diff --git a/subsys/rtio/rtio_executor_simple.c b/subsys/rtio/rtio_executor_simple.c new file mode 100644 index 00000000000..c06d09d95a9 --- /dev/null +++ b/subsys/rtio/rtio_executor_simple.c @@ -0,0 +1,79 @@ +/* + * Copyright (c) 2022 Intel Corporation. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +#include +#include +#include + +#include +LOG_MODULE_REGISTER(rtio_executor_simple, CONFIG_RTIO_LOG_LEVEL); + + +/** + * @brief Submit submissions to simple executor + * + * The simple executor provides no concurrency instead + * execution each submission chain one after the next. + * + * @param r RTIO context + * + * @retval 0 Always succeeds + */ +int rtio_simple_submit(struct rtio *r) +{ + /* TODO For each submission queue entry chain, + * submit the chain to the first iodev + */ + struct rtio_sqe *sqe = rtio_spsc_consume(r->sq); + + if (sqe != NULL) { + LOG_DBG("Calling iodev submit"); + rtio_iodev_submit(sqe, r); + } + + return 0; +} + +/** + * @brief Callback from an iodev describing success + */ +void rtio_simple_ok(struct rtio *r, const struct rtio_sqe *sqe, int result) +{ + rtio_cqe_submit(r, result, sqe->userdata); + rtio_spsc_release(r->sq); + rtio_simple_submit(r); +} + +/** + * @brief Callback from an iodev describing error + */ +void rtio_simple_err(struct rtio *r, const struct rtio_sqe *sqe, int result) +{ + struct rtio_sqe *nsqe; + bool chained; + + rtio_cqe_submit(r, result, sqe->userdata); + chained = sqe->flags & RTIO_SQE_CHAINED; + rtio_spsc_release(r->sq); + + if (chained) { + + nsqe = rtio_spsc_consume(r->sq); + while (nsqe != NULL && nsqe->flags & RTIO_SQE_CHAINED) { + rtio_cqe_submit(r, -ECANCELED, nsqe->userdata); + rtio_spsc_release(r->sq); + nsqe = rtio_spsc_consume(r->sq); + } + + if (nsqe != NULL) { + rtio_iodev_submit(nsqe, r); + } + + } else { + /* Now we can submit the next in the queue if we aren't done */ + rtio_simple_submit(r); + } +} diff --git a/tests/subsys/rtio/rtio_api/CMakeLists.txt b/tests/subsys/rtio/rtio_api/CMakeLists.txt new file mode 100644 index 00000000000..d66cb7f869f --- /dev/null +++ b/tests/subsys/rtio/rtio_api/CMakeLists.txt @@ -0,0 +1,14 @@ +# Copyright (c) 2021 Intel Corporation. +# SPDX-License-Identifier: Apache-2.0 + +cmake_minimum_required(VERSION 3.20.0) +find_package(Zephyr REQUIRED HINTS $ENV{ZEPHYR_BASE}) +project(rtio_api_test) + +FILE(GLOB app_sources src/*.c) +target_sources(app PRIVATE ${app_sources}) + +target_include_directories(app PRIVATE + ${ZEPHYR_BASE}/include + ${ZEPHYR_BASE}/kernel/include + ${ZEPHYR_BASE}/arch/${ARCH}/include) diff --git a/tests/subsys/rtio/rtio_api/prj.conf b/tests/subsys/rtio/rtio_api/prj.conf new file mode 100644 index 00000000000..0cf4eb97b7e --- /dev/null +++ b/tests/subsys/rtio/rtio_api/prj.conf @@ -0,0 +1,3 @@ +CONFIG_ZTEST=y +CONFIG_LOG=y +CONFIG_RTIO=y diff --git a/tests/subsys/rtio/rtio_api/src/main.c b/tests/subsys/rtio/rtio_api/src/main.c new file mode 100644 index 00000000000..9f9977afec0 --- /dev/null +++ b/tests/subsys/rtio/rtio_api/src/main.c @@ -0,0 +1,389 @@ +/* + * Copyright (c) 2021 Intel Corporation. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +#include +#include +#include +#include +#include +#include +#include + +#include "rtio_iodev_test.h" + +/* + * @brief Produce and Consume a single uint32_t in the same execution context + * + * @see rtio_spsc_acquire(), rtio_spsc_produce(), rtio_spsc_consume(), rtio_spsc_release() + * + * @ingroup rtio_tests + */ +void test_produce_consume_size1(void) +{ + RTIO_SPSC_DEFINE(ezspsc, uint32_t, 1); + + const uint32_t magic = 43219876; + + uint32_t *acq = rtio_spsc_acquire(&ezspsc); + + zassert_not_null(acq, "Acquire should succeed"); + + *acq = magic; + + uint32_t *acq2 = rtio_spsc_acquire(&ezspsc); + + zassert_is_null(acq2, "Acquire should fail"); + + uint32_t *cons = rtio_spsc_consume(&ezspsc); + + zassert_is_null(cons, "Consume should fail"); + + zassert_equal(rtio_spsc_consumable(&ezspsc), 0, "Consumables should be 0"); + + rtio_spsc_produce(&ezspsc); + + zassert_equal(rtio_spsc_consumable(&ezspsc), 1, "Consumables should be 1"); + + uint32_t *cons2 = rtio_spsc_consume(&ezspsc); + + zassert_equal(rtio_spsc_consumable(&ezspsc), 0, "Consumables should be 0"); + + zassert_not_null(cons2, "Consume should not fail"); + zassert_equal(*cons2, magic, "Consume value should equal magic"); + + uint32_t *cons3 = rtio_spsc_consume(&ezspsc); + + zassert_is_null(cons3, "Consume should fail"); + + + uint32_t *acq3 = rtio_spsc_acquire(&ezspsc); + + zassert_is_null(acq3, "Acquire should not succeed"); + + rtio_spsc_release(&ezspsc); + + uint32_t *acq4 = rtio_spsc_acquire(&ezspsc); + + zassert_not_null(acq4, "Acquire should succeed"); +} + +/*&* + * @brief Produce and Consume 3 items at a time in a spsc of size 4 to validate masking + * and wrap around reads/writes. + * + * @see rtio_spsc_acquire(), rtio_spsc_produce(), rtio_spsc_consume(), rtio_spsc_release() + * + * @ingroup rtio_tests + */ +void test_produce_consume_wrap_around(void) +{ + RTIO_SPSC_DEFINE(ezspsc, uint32_t, 4); + + for (int i = 0; i < 10; i++) { + zassert_equal(rtio_spsc_consumable(&ezspsc), 0, "Consumables should be 0"); + for (int j = 0; j < 3; j++) { + uint32_t *entry = rtio_spsc_acquire(&ezspsc); + + zassert_not_null(entry, "Acquire should succeed"); + *entry = i * 3 + j; + rtio_spsc_produce(&ezspsc); + } + zassert_equal(rtio_spsc_consumable(&ezspsc), 3, "Consumables should be 3"); + + for (int k = 0; k < 3; k++) { + uint32_t *entry = rtio_spsc_consume(&ezspsc); + + zassert_not_null(entry, "Consume should succeed"); + zassert_equal(*entry, i * 3 + k, "Consume value should equal i*3+k"); + rtio_spsc_release(&ezspsc); + } + + zassert_equal(rtio_spsc_consumable(&ezspsc), 0, "Consumables should be 0"); + + } +} + +/** + * @brief Ensure that integer wraps continue to work. + * + * Done by setting all values to UINTPTR_MAX - 2 and writing and reading enough + * to ensure integer wraps occur. + */ +void test_int_wrap_around(void) +{ + RTIO_SPSC_DEFINE(ezspsc, uint32_t, 4); + ezspsc._spsc.in = ATOMIC_INIT(UINTPTR_MAX - 2); + ezspsc._spsc.out = ATOMIC_INIT(UINTPTR_MAX - 2); + + for (int j = 0; j < 3; j++) { + uint32_t *entry = rtio_spsc_acquire(&ezspsc); + + zassert_not_null(entry, "Acquire should succeed"); + *entry = j; + rtio_spsc_produce(&ezspsc); + } + + zassert_equal(atomic_get(&ezspsc._spsc.in), UINTPTR_MAX + 1, "Spsc in should wrap"); + + for (int k = 0; k < 3; k++) { + uint32_t *entry = rtio_spsc_consume(&ezspsc); + + zassert_not_null(entry, "Consume should succeed"); + zassert_equal(*entry, k, "Consume value should equal i*3+k"); + rtio_spsc_release(&ezspsc); + } + + zassert_equal(atomic_get(&ezspsc._spsc.out), UINTPTR_MAX + 1, "Spsc out should wrap"); +} + +#define MAX_RETRIES 5 +#define SMP_ITERATIONS 100 + +RTIO_SPSC_DEFINE(spsc, uint32_t, 4); + +static void t1_consume(void *p1, void *p2, void *p3) +{ + struct rtio_spsc_spsc *ezspsc = p1; + uint32_t retries = 0; + uint32_t *val = NULL; + + for (int i = 0; i < SMP_ITERATIONS; i++) { + val = NULL; + retries = 0; + while (val == NULL && retries < MAX_RETRIES) { + val = rtio_spsc_consume(ezspsc); + retries++; + } + if (val != NULL) { + rtio_spsc_release(ezspsc); + } else { + printk("consumer yield\n"); + k_yield(); + } + } +} + +static void t2_produce(void *p1, void *p2, void *p3) +{ + struct rtio_spsc_spsc *ezspsc = p1; + uint32_t retries = 0; + uint32_t *val = NULL; + + for (int i = 0; i < SMP_ITERATIONS; i++) { + val = NULL; + retries = 0; + printk("producer acquiring\n"); + while (val == NULL && retries < MAX_RETRIES) { + val = rtio_spsc_acquire(ezspsc); + retries++; + } + if (val != NULL) { + *val = SMP_ITERATIONS; + rtio_spsc_produce(ezspsc); + } else { + printk("producer yield\n"); + k_yield(); + } + } +} + +#define STACK_SIZE (384 + CONFIG_TEST_EXTRA_STACK_SIZE) +#define THREADS_NUM 2 + +struct thread_info { + k_tid_t tid; + int executed; + int priority; + int cpu_id; +}; +static struct thread_info tinfo[THREADS_NUM]; +static struct k_thread tthread[THREADS_NUM]; +static K_THREAD_STACK_ARRAY_DEFINE(tstack, THREADS_NUM, STACK_SIZE); + + +/** + * @brief Test that the producer and consumer are indeed thread safe + * + * This can and should be validated on SMP machines where incoherent + * memory could cause issues. + */ +void test_spsc_threaded(void) +{ + + tinfo[0].tid = + k_thread_create(&tthread[0], tstack[0], STACK_SIZE, + (k_thread_entry_t)t1_consume, + &spsc, NULL, NULL, + K_PRIO_PREEMPT(5), + K_INHERIT_PERMS, K_NO_WAIT); + tinfo[1].tid = + k_thread_create(&tthread[1], tstack[1], STACK_SIZE, + (k_thread_entry_t)t2_produce, + &spsc, NULL, NULL, + K_PRIO_PREEMPT(5), + K_INHERIT_PERMS, K_NO_WAIT); + + k_thread_join(tinfo[1].tid, K_FOREVER); + k_thread_join(tinfo[0].tid, K_FOREVER); +} + + +RTIO_EXECUTOR_SIMPLE_DEFINE(simple_exec); +RTIO_DEFINE(r_simple, (struct rtio_executor *)&simple_exec, 4, 4); +struct rtio_iodev_test iodev_test_simple; + +/** + * @brief Test the basics of the RTIO API + * + * Ensures that we can setup an RTIO context, enqueue a request, and receive + * a completion event. + */ +void test_rtio_simple(void) +{ + int res; + uintptr_t userdata[2] = {0, 1}; + struct rtio_sqe *sqe; + struct rtio_cqe *cqe; + + rtio_iodev_test_init(&iodev_test_simple); + + TC_PRINT("setting up single no-op\n"); + sqe = rtio_spsc_acquire(r_simple.sq); + zassert_not_null(sqe, "Expected a valid sqe"); + rtio_sqe_prep_nop(sqe, (struct rtio_iodev *)&iodev_test_simple, &userdata[0]); + + TC_PRINT("submit with wait\n"); + res = rtio_submit(&r_simple, 1); + zassert_ok(res, "Should return ok from rtio_execute"); + + cqe = rtio_spsc_consume(r_simple.cq); + zassert_not_null(cqe, "Expected a valid cqe"); + zassert_ok(cqe->result, "Result should be ok"); + zassert_equal_ptr(cqe->userdata, &userdata[0], "Expected userdata back"); + rtio_spsc_release(r_simple.cq); +} + +RTIO_EXECUTOR_SIMPLE_DEFINE(chain_exec); +RTIO_DEFINE(r_chain, (struct rtio_executor *)&chain_exec, 4, 4); +struct rtio_iodev_test iodev_test_chain[2]; + +/** + * @brief Test chained requests + * + * Ensures that we can setup an RTIO context, enqueue a chained requests, + * and receive completion events in the correct order given the chained + * flag and multiple devices where serialization isn't guaranteed. + */ +void test_rtio_chain(void) +{ + int res; + uintptr_t userdata[4] = {0, 1, 2, 3}; + struct rtio_sqe *sqe; + struct rtio_cqe *cqe; + + for (int i = 0; i < 2; i++) { + rtio_iodev_test_init(&iodev_test_chain[i]); + } + + for (int i = 0; i < 4; i++) { + sqe = rtio_spsc_acquire(r_chain.sq); + zassert_not_null(sqe, "Expected a valid sqe"); + rtio_sqe_prep_nop(sqe, (struct rtio_iodev *)&iodev_test_chain[i % 2], + &userdata[i]); + sqe->flags |= RTIO_SQE_CHAINED; + } + + res = rtio_submit(&r_chain, 4); + zassert_ok(res, "Should return ok from rtio_execute"); + zassert_equal(rtio_spsc_consumable(r_chain.cq), 4, "Should have 4 pending completions"); + + for (int i = 0; i < 4; i++) { + TC_PRINT("consume %d\n", i); + cqe = rtio_spsc_consume(r_chain.cq); + zassert_not_null(cqe, "Expected a valid cqe"); + zassert_ok(cqe->result, "Result should be ok"); + zassert_equal_ptr(cqe->userdata, &userdata[i], "Expected in order completions"); + rtio_spsc_release(r_chain.cq); + } +} + +RTIO_EXECUTOR_SIMPLE_DEFINE(multi_exec); +RTIO_DEFINE(r_multi, (struct rtio_executor *)&multi_exec, 4, 4); +struct rtio_iodev_test iodev_test_multi[2]; + +/** + * @brief Test multiple asynchronous chains against one iodev + */ +void test_rtio_multiple_chains(void) +{ + int res; + uintptr_t userdata[4] = {0, 1, 2, 3}; + struct rtio_sqe *sqe; + struct rtio_cqe *cqe; + + for (int i = 0; i < 2; i++) { + rtio_iodev_test_init(&iodev_test_multi[i]); + } + + for (int i = 0; i < 2; i++) { + for (int j = 0; j < 2; j++) { + sqe = rtio_spsc_acquire(r_multi.sq); + zassert_not_null(sqe, "Expected a valid sqe"); + rtio_sqe_prep_nop(sqe, (struct rtio_iodev *)&iodev_test_multi[i], + (void *)userdata[i*2 + j]); + if (j == 0) { + sqe->flags |= RTIO_SQE_CHAINED; + } else { + sqe->flags |= 0; + } + } + } + + TC_PRINT("calling submit from test case\n"); + res = rtio_submit(&r_multi, 0); + zassert_ok(res, "Should return ok from rtio_execute"); + + bool seen[4] = { 0 }; + + TC_PRINT("waiting for 4 completions\n"); + for (int i = 0; i < 4; i++) { + TC_PRINT("waiting on completion %d\n", i); + cqe = rtio_spsc_consume(r_multi.cq); + + while (cqe == NULL) { + k_sleep(K_MSEC(1)); + cqe = rtio_spsc_consume(r_multi.cq); + } + + zassert_not_null(cqe, "Expected a valid cqe"); + TC_PRINT("result %d, would block is %d, inval is %d\n", + cqe->result, -EWOULDBLOCK, -EINVAL); + zassert_ok(cqe->result, "Result should be ok"); + seen[(uintptr_t)cqe->userdata] = true; + if (seen[1]) { + zassert_true(seen[0], "Should see 0 before 1"); + } + if (seen[3]) { + zassert_true(seen[2], "Should see 2 before 3"); + } + rtio_spsc_release(r_multi.cq); + } +} + +void test_main(void) +{ + ztest_test_suite(rtio_spsc_test, + ztest_1cpu_unit_test(test_produce_consume_size1), + ztest_1cpu_unit_test(test_produce_consume_wrap_around), + ztest_1cpu_unit_test(test_int_wrap_around), + ztest_unit_test(test_spsc_threaded), + ztest_unit_test(test_rtio_simple), + ztest_unit_test(test_rtio_chain), + ztest_unit_test(test_rtio_multiple_chains) + ); + + ztest_run_test_suite(rtio_spsc_test); +} diff --git a/tests/subsys/rtio/rtio_api/src/rtio_iodev_test.c b/tests/subsys/rtio/rtio_api/src/rtio_iodev_test.c new file mode 100644 index 00000000000..3ec17b5cdbe --- /dev/null +++ b/tests/subsys/rtio/rtio_api/src/rtio_iodev_test.c @@ -0,0 +1,68 @@ +/* + * Copyright (c) 2022 Intel Corporation. + * + * SPDX-License-Identifier: Apache-2.0 + */ + + +#include +#include +#include "rtio_iodev_test.h" + +static void rtio_iodev_timer_fn(struct k_timer *tm) +{ + struct rtio_iodev_test *iodev = CONTAINER_OF(tm, struct rtio_iodev_test, timer); + + struct rtio *r = iodev->r; + const struct rtio_sqe *sqe = iodev->sqe; + + iodev->r = NULL; + iodev->sqe = NULL; + + /* Complete the request with Ok and a result */ + printk("sqe ok callback\n"); + rtio_sqe_ok(r, sqe, 0); +} + +static void rtio_iodev_test_submit(const struct rtio_sqe *sqe, struct rtio *r) +{ + struct rtio_iodev_test *iodev = (struct rtio_iodev_test *)sqe->iodev; + + /** + * TODO this isn't quite right, probably should be equivalent to a + * pend instead of a fail here. This submission chain on this iodev + * needs to wait until the iodev is available again, which should be + * checked after each sqe using the iodev completes. A smart + * executor then should have, much like a thread scheduler, a pend + * list that it checks against on each completion. + */ + if (k_timer_remaining_get(&iodev->timer) != 0) { + printk("would block, timer not free!\n"); + rtio_sqe_err(r, sqe, -EWOULDBLOCK); + return; + } + + iodev->sqe = sqe; + iodev->r = r; + + /** + * Simulate an async hardware request with a one shot timer + * + * In reality the time to complete might have some significant variance + * but this is proof enough of a working API flow. + * + * TODO enable setting this the time here in some way + */ + printk("starting one shot\n"); + k_timer_start(&iodev->timer, K_MSEC(10), K_NO_WAIT); +} + +const struct rtio_iodev_api rtio_iodev_test_api = { + .submit = rtio_iodev_test_submit, +}; + +void rtio_iodev_test_init(struct rtio_iodev_test *test) +{ + test->iodev.api = &rtio_iodev_test_api; + k_timer_init(&test->timer, rtio_iodev_timer_fn, NULL); +} diff --git a/tests/subsys/rtio/rtio_api/src/rtio_iodev_test.h b/tests/subsys/rtio/rtio_api/src/rtio_iodev_test.h new file mode 100644 index 00000000000..f7e8132638a --- /dev/null +++ b/tests/subsys/rtio/rtio_api/src/rtio_iodev_test.h @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2022 Intel Corporation. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +#include +#include + +#ifndef RTIO_IODEV_TEST_H_ +#define RTIO_IODEV_TEST_H_ + +/* + * @brief A simple asynchronous testable iodev + */ +struct rtio_iodev_test { + /** + * io device struct as the first member, makes this an rtio_iodev + */ + struct rtio_iodev iodev; + + /** + * k_timer for an asynchronous task + */ + struct k_timer timer; + + /** + * Currently executing sqe + */ + const struct rtio_sqe *sqe; + + /** + * Currently executing rtio context + */ + struct rtio *r; +}; + +void rtio_iodev_test_init(struct rtio_iodev_test *test); + +#endif /* RTIO_IODEV_TEST_H_ */ diff --git a/tests/subsys/rtio/rtio_api/testcase.yaml b/tests/subsys/rtio/rtio_api/testcase.yaml new file mode 100644 index 00000000000..b6b625fa72f --- /dev/null +++ b/tests/subsys/rtio/rtio_api/testcase.yaml @@ -0,0 +1,7 @@ +tests: + subsys.rtio.api: + tags: rtio + subsys.rtio.api.submit_sem: + tags: rtio + extra_configs: + - CONFIG_RTIO_SUBMIT_SEM=y