rtio: Real-Time Input/Output Stream

A DMA friendly Stream API for zephyr. Based on ideas from io_uring
and iio, a queue based API for I/O operations.

Provides a pair of fixed length ringbuffer backed queues for submitting
I/O requests and recieving I/O completions. The requests may be chained
together to ensure the next operation does not start until the current
one is complete.

Requests target an abstract rtio_iodev which is expected to wrap all
the hardware particulars of how to perform the operation. For example
with a SPI bus device, a description of what a read, and write mean
can be decided by the iodev wrapping a particular device
hanging off of a SPI controller.

The queue pair are submitted to an executor which may be a simple
inplace looping executor done in the callers execution context
(thread/stack) but other executors are expected. A threadpool executor
might for example allow for concurrent request chains to execute in
parallel. A DMA executor, in conjunction with DMA aware iodevs
would allow for hardware offloading of operations going so far as to
schedule with priority using hardware arbitration.

Both the iodev and executor are definable by a particular
SoC, meaning they can work in conjuction to perform IO operations
using a particular DMA controller or methodology if desired.

The application decides entirely how large the queues are, where
the buffers to read/write come from (some executors
may have particular demands!), and which executor to submit
requests to.

Signed-off-by: Tom Burdick <thomas.burdick@intel.com>
This commit is contained in:
Tom Burdick 2019-06-26 10:17:18 -05:00 committed by Anas Nashif
commit 3d2ead38cb
18 changed files with 2067 additions and 0 deletions

View file

@ -615,6 +615,7 @@
/include/zephyr/posix/ @pfalcon
/include/zephyr/pm/pm.h @nashif @ceolin
/include/zephyr/drivers/ptp_clock.h @tbursztyka
/include/zephyr/rtio/ @teburd
/include/zephyr/shared_irq.h @dcpleung @nashif @andyross
/include/zephyr/shell/ @jakub-uC @nordic-krch
/include/zephyr/shell/shell_mqtt.h @ycsin
@ -815,6 +816,7 @@ scripts/gen_image_info.py @tejlmand
/tests/subsys/debug/coredump/ @dcpleung
/tests/subsys/fs/ @nashif @nvlsianpu @de-nordic
/tests/subsys/sd/ @danieldegrasse
/tests/subsys/rtio/ @teburd
/tests/subsys/settings/ @nvlsianpu
/tests/subsys/shell/ @jakub-uC @nordic-krch
# Get all docs reviewed

View file

@ -27,4 +27,5 @@ OS Services
tfm/index
usb/index.rst
virtualization/index.rst
rtio/index.rst
misc.rst

431
doc/services/rtio/index.rst Normal file
View file

@ -0,0 +1,431 @@
.. _rtio_api:
Real Time I/O (RTIO)
####################
.. contents::
:local:
:depth: 2
.. image:: rings.png
:width: 800
:alt: Submissions and Completion Ring Queues
RTIO provides a framework for doing asynchronous operation chains with event
driven I/O. This section covers the RTIO API, queues, executor, iodev,
and common usage patterns with peripheral devices.
RTIO takes a lot of inspiration from Linux's io_uring in its operations and API
as that API matches up well with hardware DMA transfer queues and descriptions.
A quick sales pitch on why RTIO works well in many scenarios:
1. API is DMA and interrupt friendly
2. No buffer copying
3. No callbacks
4. Blocking or non-blocking operation
Problem
*******
An application wishing to do complex DMA or interrupt driven operations today
in Zephyr requires direct knowledge of the hardware and how it works. There is
no understanding in the DMA API of other Zephyr devices and how they relate.
This means doing complex audio, video, or sensor streaming requires direct
hardware knowledge or leaky abstractions over DMA controllers. Neither is ideal.
To enable asynchronous operations, especially with DMA, a description of what
to do rather than direct operations through C and callbacks is needed. Enabling
DMA features such as channels with priority, and sequences of transfers requires
more than a simple list of descriptions.
Using DMA and/or interrupt driven I/O shouldn't dictate whether or not the
call is blocking or not.
Inspiration, introducing io_uring
*********************************
It's better not to reinvent the wheel (or ring in this case) and io_uring as an
API from the Linux kernel provides a winning model. In io_uring there are two
lock-free ring buffers acting as queues shared between the kernel and a userland
application. One queue for submission entries which may be chained and flushed to
create concurrent sequential requests. A second queue for completion queue events.
Only a single syscall is actually required to execute many operations, the
io_uring_submit call. This call may block the caller when a number of
operations to wait on is given.
This model maps well to DMA and interrupt driven transfers. A request to do a
sequence of operations in an asynchronous way directly relates
to the way hardware typically works with interrupt driven state machines
potentially involving multiple peripheral IPs like bus and DMA controllers.
Submission Queue and Chaining
*****************************
The submission queue (sq), is the description of the operations
to perform in concurrent chains.
For example imagine a typical SPI transfer where you wish to write a
register address to then read from. So the sequence of operations might be...
1. Chip Select
2. Clock Enable
3. Write register address into SPI transmit register
4. Read from the SPI receive register into a buffer
5. Disable clock
6. Disable Chip Select
If anything in this chain of operations fails give up. Some of those operations
can be embodied in a device abstraction that understands a read or write
implicitly means setup the clock and chip select. The transactional nature of
the request also needs to be embodied in some manner. Of the operations above
perhaps the read could be done using DMA as its large enough make sense. That
requires an understanding of how to setup the device's particular DMA to do so.
The above sequence of operations is embodied in RTIO as chain of
submission queue entries (sqe). Chaining is done by setting a bitflag in
an sqe to signify the next sqe must wait on the current one.
Because the chip select and clocking is common to a particular SPI controller
and device on the bus it is embodied in what RTIO calls an iodev.
Multiple operations against the same iodev are done in the order provided as
soon as possible. If two operation chains have varying points using the same
device its possible one chain will have to wait for another to complete.
Completion Queue
****************
In order to know when a sqe has completed there is a completion
queue (cq) with completion queue events (cqe). A sqe once completed results in
a cqe being pushed into the cq. The ordering of cqe may not be the same order of
sqe. A chain of sqe will however ensure ordering and failure cascading.
Other potential schemes are possible but a completion queue is a well trod
idea with io_uring and other similar operating system APIs.
Executor and IODev
******************
Turning submission queue entries (sqe) into completion queue events (cqe) is the
job of objects implementing the executor and iodev APIs. These APIs enable
coordination between themselves to enable things like DMA transfers.
The end result of these APIs should be a method to resolve the request by
deciding some of the following questions with heuristic/constraint
based decision making.
* Polling, Interrupt, or DMA transfer?
* If DMA, are the requirements met (peripheral supported by DMAC, etc).
The executor is meant to provide policy for when to use each transfer
type, and provide the common code for walking through submission queue
chains by providing calls the iodev may use to signal completion,
error, or a need to suspend and wait.
Outstanding Questions
*********************
RTIO is not a complete API and solution, and is currently evolving to best
fit the nature of an RTOS. The general ideas behind a pair of queues to
describe requests and completions seems sound and has been proven out in
other contexts. Questions remain though.
Timeouts and Deadlines
======================
Timeouts and deadlines are key to being Real-Time. Real-Time in Zephyr means
being able to do things when an application wants them done. That could mean
different things from a deadline with best effort attempts or a timeout and
failure.
These features would surely be useful in many cases, but would likely add some
significant complexities. It's something to decide upon, and even if enabled
would likely be a compile time optional feature leading to complex testing.
Cancellation
============
Canceling an already queued operation could be possible with a small
API addition to perhaps take both the RTIO context and a pointer to the
submission queue entry. However, cancellation as an API induces many potential
complexities that might not be appropriate. It's something to be decided upon.
Userspace Support
=================
RTIO with userspace is certainly plausible but would require the equivalent of
a memory map call to map the shared ringbuffers and also potentially dma buffers.
Additionally a DMA buffer interface would likely need to be provided for
coherence and MMU usage.
IODev and Executor API
======================
Lastly the API between an executor and iodev is incomplete.
There are certain interactions that should be supported. Perhaps things like
expanding a submission queue entry into multiple submission queue entries in
order to split up work that can be done by a device and work that can be done
by a DMA controller.
In some SoCs only specific DMA channels may be used with specific devices. In
others there are requirements around needing a DMA handshake or specific
triggering setups to tell the DMA when to start its operation.
None of that, from the outward facing API, is an issue.
It is however an unresolved task and issue from an internal API between the
executor and iodev. This requires some SoC specifics and enabling those
generically isn't likely possible. That's ok, an iodev and dma executor should
be vendor specific, but an API needs to be there between them that is not!
Special Hardware: Intel HDA
===========================
In some cases there's a need to always do things in a specific order
with a specific buffer allocation strategy. Consider a DMA that *requires*
the usage of a circular buffer segmented into blocks that may only be
transferred one after another. This is the case of the Intel HDA stream for
audio.
In this scenario the above API can still work, but would require an additional
buffer allocator to work with fixed sized segments.
When to Use
***********
It's important to understand when DMA like transfers are useful and when they
are not. It's a poor idea to assume that something made for high throughput will
work for you. There is a computational, memory, and latency cost to setup the
description of transfers.
Polling at 1Hz an air sensor will almost certainly result in a net negative
result compared to ad-hoc sensor (i2c/spi) requests to get the sample.
Continuous transfers, driven by timer or interrupt, of data from a peripheral's
on board FIFO over I2C, I3C, SPI, MIPI, I2S, etc... maybe, but not always!
Examples
********
Examples speak loudly about the intended uses and goals of an API. So several key
examples are presented below. Some are entirely plausible today without a
big leap. Others (the sensor example) would require additional work in other
APIs outside of RTIO as a sub system and are theoretical.
Chained Blocking Requests
=========================
A common scenario is needing to write the register address to then read from.
This can be accomplished by chaining a write into a read operation.
The transaction on i2c is implicit for each operation chain.
.. code-block:: C
RTIO_I2C_IODEV(i2c_dev, I2C_DT_SPEC_INST(n));
RTIO_DEFINE(ez_io, 4, 4);
static uint16_t reg_addr;
static uint8_t buf[32];
int do_some_io(void)
{
struct rtio_sqe *write_sqe = rtio_spsc_acquire(ez_io.sq);
struct rtio_sqe *read_sqe = rtio_spsc_acquire(ez_io.sq);
rtio_sqe_prep_write(write_sqe, i2c_dev, RTIO_PRIO_LOW, &reg_addr, 2);
write_sqe->flags = RTIO_SQE_CHAINED; /* the next item in the queue will wait on this one */
rtio_sqe_prep_read(read_sqe, i2c_dev, RTIO_PRIO_LOW, buf, 32);
rtio_submit(rtio_inplace_executor, &ez_io, 2);
struct rtio_cqe *read_cqe = rtio_spsc_consume(ez_io.cq);
struct rtio_cqe *write_cqe = rtio_spsc_consume(ez_io.cq);
if(read_cqe->result < 0) {
LOG_ERR("read failed!");
}
if(write_cqe->result < 0) {
LOG_ERR("write failed!");
}
rtio_spsc_release(ez_io.cq);
rtio_spsc_release(ez_io.cq);
}
Non blocking device to device
=============================
Imagine wishing to read from one device on an I2C bus and then write the same
buffer to a device on a SPI bus without blocking the thread or setting up
callbacks or other IPC notification mechanisms.
Perhaps an I2C temperature sensor and a SPI lowrawan module. The following is a
simplified version of that potential operation chain.
.. code-block:: C
RTIO_I2C_IODEV(i2c_dev, I2C_DT_SPEC_INST(n));
RTIO_SPI_IODEV(spi_dev, SPI_DT_SPEC_INST(m));
RTIO_DEFINE(ez_io, 4, 4);
static uint8_t buf[32];
int do_some_io(void)
{
uint32_t read, write;
struct rtio_sqe *read_sqe = rtio_spsc_acquire(ez_io.sq);
rtio_sqe_prep_read(read_sqe, i2c_dev, RTIO_PRIO_LOW, buf, 32);
read_sqe->flags = RTIO_SQE_CHAINED; /* the next item in the queue will wait on this one */
/* Safe to do as the chained operation *ensures* that if one fails all subsequent ops fail */
struct rtio_sqe *write_sqe = rtio_spsc_acquire(ez_io.sq);
rtio_sqe_prep_write(write_sqe, spi_dev, RTIO_PRIO_LOW, buf, 32);
/* call will return immediately without blocking if possible */
rtio_submit(rtio_inplace_executor, &ez_io, 0);
/* These calls might return NULL if the operations have not yet completed! */
for (int i = 0; i < 2; i++) {
struct rtio_cqe *cqe = rtio_spsc_consume(ez_io.cq);
while(cqe == NULL) {
cqe = rtio_spsc_consume(ez_io.cq);
k_yield();
}
if(cqe->userdata == &read && cqe->result < 0) {
LOG_ERR("read from i2c failed!");
}
if(cqe->userdata == &write && cqe->result < 0) {
LOG_ERR("write to spi failed!");
}
/* Must release the completion queue event after consume */
rtio_spsc_release(ez_io.cq);
}
}
Nested iodevs for Devices on Buses (Sensors), Theoretical
=========================================================
Consider a device like a sensor or audio codec sitting on a bus.
Its useful to consider that the sensor driver can use RTIO to do I/O on the SPI
bus, while also being an RTIO device itself. The sensor iodev can set aside a
small portion of the buffer in front or in back to store some metadata describing
the format of the data. This metadata could then be used in creating a sensor
readings iterator which lazily lets you map over each reading, doing
calculations such as FIR/IIR filtering, or perhaps translating the readings into
other numerical formats with useful measurement units such as SI. RTIO is a
common movement API and allows for such uses while not deciding the mechanism.
This same sort of setup could be done for other data streams such as audio or
video.
.. code-block:: C
/* Note that the sensor device itself can use RTIO to get data over I2C/SPI
* potentially with DMA, but we don't need to worry about that here
* All we need to know is the device tree node_id and that it can be an iodev
*/
RTIO_SENSOR_IODEV(sensor_dev, DEVICE_DT_GET(DT_NODE(super6axis));
RTIO_DEFINE(ez_io, 4, 4);
/* The sensor driver decides the minimum buffer size for us, we decide how
* many bufs. This could be a typical multiple of a fifo packet the sensor
* produces, ICM42688 for example produces a FIFO packet of 20 bytes in
* 20bit mode at 32KHz so perhaps we'd like to get 4 buffers of 4ms of data
* each in this setup to process on. and its already been defined here for us.
*/
#include <sensors/icm42688_p.h>
static uint8_t bufs[4][ICM42688_RTIO_BUF_SIZE];
int do_some_sensors(void) {
/* Obtain a dmac executor from the DMA device */
struct device *dma = DEVICE_DT_GET(DT_NODE(dma0));
const struct rtio_executor *rtio_dma_exec =
dma_rtio_executor(dma);
/*
* Set the executor for our queue context
*/
rtio_set_executor(ez_io, rtio_dma_exec);
/* Mostly we want to feed the sensor driver enough buffers to fill while
* we wait and process! Small enough to process quickly with low latency,
* big enough to not spend all the time setting transfers up.
*
* It's assumed here that the sensor has been configured already
* and each FIFO watermark interrupt that occurs it attempts
* to pull from the queue, fill the buffer with a small metadata
* offset using its own rtio request to the SPI bus using DMA.
*/
for(int i = 0; i < 4; i++) {
struct rtio_sqe *read_sqe = rtio_spsc_acquire(ez_io.sq);
rtio_sqe_prep_read(read_sqe, sensor_dev, RTIO_PRIO_HIGH, bufs[i], ICM42688_RTIO_BUF_SIZE);
}
struct device *sensor = DEVICE_DT_GET(DT_NODE(super6axis));
struct sensor_reader reader;
struct sensor_channels channels[4] = {
SENSOR_TIMESTAMP_CHANNEL,
SENSOR_CHANNEL(int32_t, SENSOR_ACC_X, 0, SENSOR_RAW),
SENSOR_CHANNEL(int32_t SENSOR_ACC_Y, 0, SENSOR_RAW),
SENSOR_CHANNEL(int32_t, SENSOR_ACC_Z, 0, SENSOR_RAW),
};
while (true) {
/* call will wait for one completion event */
rtio_submit(ez_io, 1);
struct rtio_cqe *cqe = rtio_spsc_consume(ez_io.cq);
if(cqe->result < 0) {
LOG_ERR("read failed!");
goto next;
}
/* Bytes read into the buffer */
int32_t bytes_read = cqe->result;
/* Retrieve soon to be reusable buffer pointer from completion */
uint8_t *buf = cqe->userdata;
/* Get an iterator (reader) that obtains sensor readings in integer
* form, 16 bit signed values in the native sensor reading format
*/
res = sensor_reader(sensor, buf, cqe->result, &reader, channels,
sizeof(channels));
__ASSERT(res == 0);
while(sensor_reader_next(&reader)) {
printf("time(raw): %d, acc (x,y,z): (%d, %d, %d)\n",
channels[0].value.u32, channels[1].value.i32,
channels[2].value.i32, channels[3].value.i32);
}
next:
/* Release completion queue event */
rtio_spsc_release(ez_io.cq);
/* resubmit a read request with the newly freed buffer to the sensor */
struct rtio_sqe *read_sqe = rtio_spsc_acquire(ez_io.sq);
rtio_sqe_prep_read(read_sqe, sensor_dev, RTIO_PRIO_HIGH, buf, ICM20649_RTIO_BUF_SIZE);
}
}
API Reference
*************
RTIO API
========
.. doxygengroup:: rtio_api
RTIO SPSC API
=============
.. doxygengroup:: rtio_spsc

BIN
doc/services/rtio/rings.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 89 KiB

627
include/zephyr/rtio/rtio.h Normal file
View file

@ -0,0 +1,627 @@
/*
* Copyright (c) 2022 Intel Corporation
*
* SPDX-License-Identifier: Apache-2.0
*/
/**
* @file
* @brief Real-Time IO device API for moving bytes with low effort
*
* RTIO uses a SPSC lock-free queue pair to enable a DMA and ISR friendly I/O API.
*
* I/O like operations are setup in a pre-allocated queue with a fixed number of
* submission requests. Each submission request takes the device to operate on
* and an operation. The rest is information needed to perform the operation such
* as a register or mmio address of the device, a source/destination buffers
* to read from or write to, and other pertinent information.
*
* These operations may be chained in a such a way that only when the current
* operation is complete will the next be executed. If the current request fails
* all chained requests will also fail.
*
* The completion of these requests are pushed into a fixed size completion
* queue which an application may actively poll for completions.
*
* An executor (could use a dma controller!) takes the queues and determines
* how to perform each requested operation. By default there is a software
* executor which does all operations in software using software device
* APIs.
*/
#ifndef ZEPHYR_INCLUDE_RTIO_RTIO_H_
#define ZEPHYR_INCLUDE_RTIO_RTIO_H_
#include <zephyr/rtio/rtio_spsc.h>
#include <zephyr/sys/__assert.h>
#include <zephyr/sys/atomic.h>
#include <zephyr/device.h>
#include <zephyr/kernel.h>
#ifdef __cplusplus
extern "C" {
#endif
/**
* @brief RTIO
* @defgroup rtio RTIO
* @{
* @}
*/
struct rtio_iodev;
/**
* @brief RTIO API
* @defgroup rtio_api RTIO API
* @ingroup rtio
* @{
*/
/**
* @brief RTIO Predefined Priorties
* @defgroup rtio_sqe_prio RTIO Priorities
* @ingroup rtio_api
* @{
*/
/**
* @brief Low priority
*/
#define RTIO_PRIO_LOW 0U
/**
* @brief Normal priority
*/
#define RTIO_PRIO_NORM 127U
/**
* @brief High priority
*/
#define RTIO_PRIO_HIGH 255U
/**
* @}
*/
/**
* @brief RTIO SQE Flags
* @defgroup rtio_sqe_flags RTIO SQE Flags
* @ingroup rtio_api
* @{
*/
/**
* @brief The next request in the queue should wait on this one.
*/
#define RTIO_SQE_CHAINED BIT(0)
/**
* @}
*/
/**
* @brief A submission queue event
*/
struct rtio_sqe {
uint8_t op; /**< Op code */
uint8_t prio; /**< Op priority */
uint16_t flags; /**< Op Flags */
struct rtio_iodev *iodev; /**< Device to operation on */
/**
* User provided pointer to data which is returned upon operation
* completion
*
* If unique identification of completions is desired this should be
* unique as well.
*/
void *userdata;
union {
struct {
uint32_t buf_len; /**< Length of buffer */
uint8_t *buf; /**< Buffer to use*/
};
};
};
/**
* @brief Submission queue
*
* This is used for typifying the members of an RTIO queue pair
* but nothing more.
*/
struct rtio_sq {
struct rtio_spsc _spsc;
struct rtio_sqe buffer[];
};
/**
* @brief A completion queue event
*/
struct rtio_cqe {
int32_t result; /**< Result from operation */
void *userdata; /**< Associated userdata with operation */
};
/**
* @brief Completion queue
*
* This is used for typifying the members of an RTIO queue pair
* but nothing more.
*/
struct rtio_cq {
struct rtio_spsc _spsc;
struct rtio_cqe buffer[];
};
struct rtio;
struct rtio_executor_api {
/**
* @brief Submit the request queue to executor
*
* The executor is responsible for interpreting the submission queue and
* creating concurrent execution chains.
*
* Concurrency is optional and implementation dependent.
*/
int (*submit)(struct rtio *r);
/**
* @brief SQE completes successfully
*/
void (*ok)(struct rtio *r, const struct rtio_sqe *sqe, int result);
/**
* @brief SQE fails to complete
*/
void (*err)(struct rtio *r, const struct rtio_sqe *sqe, int result);
};
/**
* @brief An executor does the work of executing the submissions.
*
* This could be a DMA controller backed executor, thread backed,
* or simple in place executor.
*
* A DMA executor might schedule all transfers with priorities
* and use hardware arbitration.
*
* A threaded executor might use a thread pool where each transfer
* chain is executed across the thread pool and the priority of the
* transfer is used as the thread priority.
*
* A simple in place exector might simply loop over and execute each
* transfer in the calling threads context. Priority is entirely
* derived from the calling thread then.
*
* An implementation of the executor must place this struct as
* its first member such that pointer aliasing works.
*/
struct rtio_executor {
const struct rtio_executor_api *api;
};
/**
* @brief An RTIO queue pair that both the kernel and application work with
*
* The kernel is the consumer of the submission queue, and producer of the completion queue.
* The application is the consumer of the completion queue and producer of the submission queue.
*
* Nothing is done until a call is performed to do the work (rtio_execute).
*/
struct rtio {
/*
* An executor which does the job of working through the submission
* queue.
*/
struct rtio_executor *executor;
#ifdef CONFIG_RTIO_SUBMIT_SEM
/* A wait semaphore which may suspend the calling thread
* to wait for some number of completions when calling submit
*/
struct k_sem *submit_sem;
uint32_t submit_count;
#endif
#ifdef CONFIG_RTIO_CONSUME_SEM
/* A wait semaphore which may suspend the calling thread
* to wait for some number of completions while consuming
* them from the completion queue
*/
struct k_sem *consume_sem;
#endif
/* Number of completions that were unable to be submitted with results
* due to the cq spsc being full
*/
atomic_t xcqcnt;
/* Submission queue */
struct rtio_sq *sq;
/* Completion queue */
struct rtio_cq *cq;
};
/**
* @brief API that an RTIO IO device should implement
*/
struct rtio_iodev_api {
/**
* @brief Submission function for a request to the iodev
*
* The iodev is responsible for doing the operation described
* as a submission queue entry and reporting results using using
* `rtio_sqe_ok` or `rtio_sqe_err` once done.
*/
void (*submit)(const struct rtio_sqe *sqe,
struct rtio *r);
/**
* TODO some form of transactional piece is missing here
* where we wish to "transact" on an iodev with multiple requests
* over a chain.
*
* Only once explicitly released or the chain fails do we want
* to release. Once released any pending iodevs in the queue
* should be done.
*
* Something akin to a lock/unlock pair.
*/
};
/* IO device submission queue entry */
struct rtio_iodev_sqe {
const struct rtio_sqe *sqe;
struct rtio *r;
};
/**
* @brief IO device submission queue
*
* This is used for reifying the member of the rtio_iodev struct
*/
struct rtio_iodev_sq {
struct rtio_spsc _spsc;
struct rtio_iodev_sqe buffer[];
};
/**
* @brief An IO device with a function table for submitting requests
*
* This is required to be the first member of every iodev. There's a strong
* possibility this will be extended with some common data fields (statistics)
* in the future.
*/
struct rtio_iodev {
/* Function pointer table */
const struct rtio_iodev_api *api;
/* Queue of RTIO contexts with requests */
struct rtio_iodev_sq *iodev_sq;
};
/** An operation that does nothing and will complete immediately */
#define RTIO_OP_NOP 0
/** An operation that receives (reads) */
#define RTIO_OP_RX 1
/** An operation that transmits (writes) */
#define RTIO_OP_TX 2
/**
* @brief Prepare a nop (no op) submission
*/
static inline void rtio_sqe_prep_nop(struct rtio_sqe *sqe,
struct rtio_iodev *iodev,
void *userdata)
{
sqe->op = RTIO_OP_NOP;
sqe->iodev = iodev;
sqe->userdata = userdata;
}
/**
* @brief Prepare a read op submission
*/
static inline void rtio_sqe_prep_read(struct rtio_sqe *sqe,
struct rtio_iodev *iodev,
int8_t prio,
uint8_t *buf,
uint32_t len,
void *userdata)
{
sqe->op = RTIO_OP_RX;
sqe->prio = prio;
sqe->iodev = iodev;
sqe->buf_len = len;
sqe->buf = buf;
sqe->userdata = userdata;
}
/**
* @brief Prepare a write op submission
*/
static inline void rtio_sqe_prep_write(struct rtio_sqe *sqe,
struct rtio_iodev *iodev,
int8_t prio,
uint8_t *buf,
uint32_t len,
void *userdata)
{
sqe->op = RTIO_OP_TX;
sqe->prio = prio;
sqe->iodev = iodev;
sqe->buf_len = len;
sqe->buf = buf;
sqe->userdata = userdata;
}
/**
* @brief Statically define and initialize a fixed length submission queue.
*
* @param name Name of the submission queue.
* @param len Queue length, power of 2 required (2, 4, 8).
*/
#define RTIO_SQ_DEFINE(name, len) \
static RTIO_SPSC_DEFINE(name, struct rtio_sqe, len)
/**
* @brief Statically define and initialize a fixed length completion queue.
*
* @param name Name of the completion queue.
* @param len Queue length, power of 2 required (2, 4, 8).
*/
#define RTIO_CQ_DEFINE(name, len) \
static RTIO_SPSC_DEFINE(name, struct rtio_cqe, len)
/**
* @brief Statically define and initialize an RTIO context
*
* @param name Name of the RTIO
* @param exec Symbol for rtio_executor (pointer)
* @param sq_sz Size of the submission queue, must be power of 2
* @param cq_sz Size of the completion queue, must be power of 2
*/
#define RTIO_DEFINE(name, exec, sq_sz, cq_sz) \
IF_ENABLED(CONFIG_RTIO_SUBMIT_SEM, (K_SEM_DEFINE(_submit_sem_##name, 0, K_SEM_MAX_LIMIT))) \
IF_ENABLED(CONFIG_RTIO_CONSUME_SEM, (K_SEM_DEFINE(_consume_sem_##name, 0, 1))) \
RTIO_SQ_DEFINE(_sq_##name, sq_sz); \
RTIO_CQ_DEFINE(_cq_##name, cq_sz); \
static struct rtio name = { \
.executor = (exec), \
.xcqcnt = ATOMIC_INIT(0), \
IF_ENABLED(CONFIG_RTIO_SUBMIT_SEM, (.submit_sem = &_submit_sem_##name,)) \
IF_ENABLED(CONFIG_RTIO_SUBMIT_SEM, (.submit_count = 0,)) \
IF_ENABLED(CONFIG_RTIO_CONSUME_SEM, (.consume_sem = &_consume_sem_##name,)) \
.sq = (struct rtio_sq *const)&_sq_##name, \
.cq = (struct rtio_cq *const)&_cq_##name, \
}
/**
* @brief Set the executor of the rtio context
*/
static inline void rtio_set_executor(struct rtio *r, struct rtio_executor *exc)
{
r->executor = exc;
}
/**
* @brief Perform a submitted operation with an iodev
*
* @param sqe Submission to work on
* @param r RTIO context
*/
static inline void rtio_iodev_submit(const struct rtio_sqe *sqe, struct rtio *r)
{
sqe->iodev->api->submit(sqe, r);
}
/**
* @brief Submit I/O requests to the underlying executor
*
* Submits the queue of requested IO operation chains to
* the underlying executor. The underlying executor will decide
* on which hardware and with what sort of parallelism the execution
* of IO chains is performed.
*
* @param r RTIO context
* @param wait_count Count of completions to wait for
* If wait_count is for completions flag is set, the call will not
* return until the desired number of completions are done. A wait count of
* non-zero requires the caller be on a thread.
*
* @retval 0 On success
*/
static inline int rtio_submit(struct rtio *r, uint32_t wait_count)
{
int res;
__ASSERT(r->executor != NULL, "expected rtio submit context to have an executor");
#ifdef CONFIG_RTIO_SUBMIT_SEM
/* TODO undefined behavior if another thread calls submit of course
*/
if (wait_count > 0) {
__ASSERT(!k_is_in_isr(),
"expected rtio submit with wait count to be called from a thread");
k_sem_reset(r->submit_sem);
r->submit_count = wait_count;
}
#endif
/* Enqueue all prepared submissions */
rtio_spsc_produce_all(r->sq);
/* Submit the queue to the executor which consumes submissions
* and produces completions through ISR chains or other means.
*/
res = r->executor->api->submit(r);
if (res != 0) {
return res;
}
/* TODO could be nicer if we could suspend the thread and not
* wake up on each completion here.
*/
#ifdef CONFIG_RTIO_SUBMIT_SEM
if (wait_count > 0) {
res = k_sem_take(r->submit_sem, K_FOREVER);
__ASSERT(res == 0,
"semaphore was reset or timed out while waiting on completions!");
}
#else
while (rtio_spsc_consumable(r->cq) < wait_count) {
#ifdef CONFIG_BOARD_NATIVE_POSIX
k_busy_wait(1);
#else
k_yield();
#endif /* CONFIG_BOARD_NATIVE_POSIX */
}
#endif
return res;
}
/**
* @brief Consume a single completion queue event if available
*
* If a completion queue event is returned rtio_cq_release(r) must be called
* at some point to release the cqe spot for the cqe producer.
*
* @param r RTIO context
*
* @retval cqe A valid completion queue event consumed from the completion queue
* @retval NULL No completion queue event available
*/
static inline struct rtio_cqe *rtio_cqe_consume(struct rtio *r)
{
return rtio_spsc_consume(r->cq);
}
/**
* @brief Wait for and consume a single completion queue event
*
* If a completion queue event is returned rtio_cq_release(r) must be called
* at some point to release the cqe spot for the cqe producer.
*
* @param r RTIO context
*
* @retval cqe A valid completion queue event consumed from the completion queue
*/
static inline struct rtio_cqe *rtio_cqe_consume_block(struct rtio *r)
{
struct rtio_cqe *cqe;
/* TODO is there a better way? reset this in submit? */
#ifdef CONFIG_RTIO_CONSUME_SEM
k_sem_reset(r->consume_sem);
#endif
cqe = rtio_spsc_consume(r->cq);
while (cqe == NULL) {
cqe = rtio_spsc_consume(r->cq);
#ifdef CONFIG_RTIO_CONSUME_SEM
k_sem_take(r->consume_sem, K_FOREVER);
#else
k_yield();
#endif
}
return cqe;
}
/**
* @brief Inform the executor of a submission completion with success
*
* This may start the next asynchronous request if one is available.
*
* @param r RTIO context
* @param sqe Submission that has succeeded
* @param result Result of the request
*/
static inline void rtio_sqe_ok(struct rtio *r, const struct rtio_sqe *sqe, int result)
{
r->executor->api->ok(r, sqe, result);
}
/**
* @brief Inform the executor of a submissions completion with error
*
* This SHALL fail the remaining submissions in the chain.
*
* @param r RTIO context
* @param sqe Submission that has failed
* @param result Result of the request
*/
static inline void rtio_sqe_err(struct rtio *r, const struct rtio_sqe *sqe, int result)
{
r->executor->api->err(r, sqe, result);
}
/**
* Submit a completion queue event with a given result and userdata
*
* Called by the executor to produce a completion queue event, no inherent
* locking is performed and this is not safe to do from multiple callers.
*
* @param r RTIO context
* @param result Integer result code (could be -errno)
* @param userdata Userdata to pass along to completion
*/
static inline void rtio_cqe_submit(struct rtio *r, int result, void *userdata)
{
struct rtio_cqe *cqe = rtio_spsc_acquire(r->cq);
if (cqe == NULL) {
atomic_inc(&r->xcqcnt);
} else {
cqe->result = result;
cqe->userdata = userdata;
rtio_spsc_produce(r->cq);
}
#ifdef CONFIG_RTIO_SUBMIT_SEM
if (r->submit_count > 0) {
r->submit_count--;
if (r->submit_count == 0) {
k_sem_give(r->submit_sem);
}
}
#endif
#ifdef CONFIG_RTIO_CONSUME_SEM
k_sem_give(r->consume_sem);
#endif
}
/* TODO add rtio_sqe_suspend() for suspending a submission chain that must
* wait on other in progress submissions or submission chains.
*/
/**
* @}
*/
#ifdef __cplusplus
}
#endif
#endif /* ZEPHYR_INCLUDE_RTIO_RTIO_H_ */

View file

@ -0,0 +1,92 @@
/*
* Copyright (c) 2022 Intel Corporation.
*
* SPDX-License-Identifier: Apache-2.0
*/
#ifndef ZEPHYR_INCLUDE_RTIO_RTIO_EXECUTOR_SIMPLE_H_
#define ZEPHYR_INCLUDE_RTIO_RTIO_EXECUTOR_SIMPLE_H_
#include <zephyr/rtio/rtio.h>
#ifdef __cplusplus
extern "C" {
#endif
/**
* @brief RTIO Simple Executor
*
* Provides the simplest possible executor without any concurrency
* or reinterpretation of requests.
*
* @defgroup rtio_executor_simple RTIO Simple Executor
* @ingroup rtio
* @{
*/
/**
* @brief Submit to the simple executor
*
* @param r RTIO context to submit
*
* @retval 0 always succeeds
*/
int rtio_simple_submit(struct rtio *r);
/**
* @brief Report a SQE has completed successfully
*
* @param r RTIO context to use
* @param sqe RTIO SQE to report success
* @param result Result of the SQE
*/
void rtio_simple_ok(struct rtio *r, const struct rtio_sqe *sqe, int result);
/**
* @brief Report a SQE has completed with error
*
* @param r RTIO context to use
* @param sqe RTIO SQE to report success
* @param result Result of the SQE
*/
void rtio_simple_err(struct rtio *r, const struct rtio_sqe *sqe, int result);
/**
* @brief Simple Executor
*/
struct rtio_simple_executor {
struct rtio_executor ctx;
};
/**
* @cond INTERNAL_HIDDEN
*/
static const struct rtio_executor_api z_rtio_simple_api = {
.submit = rtio_simple_submit,
.ok = rtio_simple_ok,
.err = rtio_simple_err
};
/**
* @endcond INTERNAL_HIDDEN
*/
/**
* @brief Define a simple executor with a given name
*
* @param name Symbol name, must be unique in the context in which its used
*/
#define RTIO_EXECUTOR_SIMPLE_DEFINE(name) \
struct rtio_simple_executor name = { .ctx = { .api = &z_rtio_simple_api } };
/**
* @}
*/
#ifdef __cplusplus
}
#endif
#endif /* ZEPHYR_INCLUDE_RTIO_RTIO_EXECUTOR_SIMPLE_H_ */

View file

@ -0,0 +1,258 @@
/*
* Copyright (c) 2022 Intel Corporation
*
* SPDX-License-Identifier: Apache-2.0
*/
#ifndef ZEPHYR_RTIO_SPSC_H_
#define ZEPHYR_RTIO_SPSC_H_
#include <stdint.h>
#include <stdbool.h>
#include <zephyr/sys/atomic.h>
/**
* @brief RTIO Single Producer Single Consumer (SPSC) Queue API
* @defgroup rtio_spsc RTIO SPSC API
* @ingroup rtio
* @{
*/
/**
* @file rtio_spsc.h
*
* @brief A lock-free and type safe power of 2 fixed sized single producer
* single consumer (SPSC) queue using a ringbuffer and atomics to ensure
* coherency.
*
* This SPSC queue implementation works on an array which wraps using a power of
* two size and uses a bit mask to perform a modulus. Atomics are used to allow
* single-producer single-consumer safe semantics without locks. Elements are
* expected to be of a fixed size. The API is type safe as the underlying buffer
* is typed and all usage is done through macros.
*
* An SPSC queue may be declared on a stack or statically and work as intended so
* long as its lifetime outlives any usage. Static declarations should be the
* preferred method as stack . It is meant to be a shared object between two
* execution contexts (ISR and a thread for example)
*
* An SPSC queue is safe to produce or consume in an ISR with O(1) push/pull.
*
* @warning SPSC is *not* safe to produce or consume in multiple execution
* contexts.
*
* Safe usage would be, where A and B are unique execution contexts:
* 1. ISR A producing and a Thread B consuming.
* 2. Thread A producing and ISR B consuming.
* 3. Thread A producing and Thread B consuming.
* 4. ISR A producing and ISR B consuming.
*/
/**
* @private
* @brief Common SPSC attributes
*
* @warning Not to be manipulated without the macros!
*/
struct rtio_spsc {
/* private value only the producer thread should mutate */
unsigned long acquire;
/* private value only the consumer thread should mutate */
unsigned long consume;
/* producer mutable, consumer readable */
atomic_t in;
/* consumer mutable, producer readable */
atomic_t out;
/* mask used to automatically wrap values */
const unsigned long mask;
};
/**
* @brief Statically initialize an rtio_spsc
*
* @param name Name of the spsc symbol to be provided
* @param type Type stored in the spsc
* @param sz Size of the spsc, must be power of 2 (ex: 2, 4, 8)
*/
#define RTIO_SPSC_INITIALIZER(name, type, sz) \
{ ._spsc = { \
.acquire = 0, \
.consume = 0, \
.in = ATOMIC_INIT(0), \
.out = ATOMIC_INIT(0), \
.mask = sz - 1, \
} \
}
/**
* @brief Declare an anonymous struct type for an rtio_spsc
*
* @param name Name of the spsc symbol to be provided
* @param type Type stored in the spsc
* @param sz Size of the spsc, must be power of 2 (ex: 2, 4, 8)
*/
#define RTIO_SPSC_DECLARE(name, type, sz) \
struct rtio_spsc_ ## name { \
struct rtio_spsc _spsc; \
type buffer[sz]; \
}
/**
* @brief Define an rtio_spsc with a fixed size
*
* @param name Name of the spsc symbol to be provided
* @param type Type stored in the spsc
* @param sz Size of the spsc, must be power of 2 (ex: 2, 4, 8)
*/
#define RTIO_SPSC_DEFINE(name, type, sz) \
RTIO_SPSC_DECLARE(name, type, sz) name = \
RTIO_SPSC_INITIALIZER(name, type, sz);
/**
* @brief Size of the SPSC queue
*
* @param spsc SPSC reference
*/
#define rtio_spsc_size(spsc) ((spsc)->_spsc.mask + 1)
/**
* @private
* @brief A number modulo the spsc size, assumes power of 2
*
* @param spsc SPSC reference
* @param i Value to modulo to the size of the spsc
*/
#define z_rtio_spsc_mask(spsc, i) (i & (spsc)->_spsc.mask)
/**
* @brief Initialize/reset a spsc such that its empty
*
* Note that this is not safe to do while being used in a producer/consumer
* situation with multiple calling contexts (isrs/threads).
*
* @param spsc SPSC to initialize/reset
*/
#define rtio_spsc_reset(spsc) \
({ \
(spsc)->_spsc.consume = 0; \
(spsc)->_spsc.acquire = 0; \
atomic_set(&(spsc)->_spsc.in, 0); \
atomic_set(&(spsc)->_spsc.out, 0); \
})
/**
* @brief Acquire an element to produce from the SPSC
*
* @param spsc SPSC to acquire an element from for producing
*
* @return A pointer to the acquired element or null if the spsc is full
*/
#define rtio_spsc_acquire(spsc) \
({ \
uint32_t idx = (uint32_t)atomic_get(&(spsc)->_spsc.in) + (spsc)->_spsc.acquire; \
bool acq = \
(idx - (uint32_t)atomic_get(&(spsc)->_spsc.out)) < rtio_spsc_size(spsc); \
if (acq) { \
(spsc)->_spsc.acquire += 1; \
} \
acq ? &((spsc)->buffer[z_rtio_spsc_mask(spsc, idx)]) : NULL; \
})
/**
* @brief Produce one previously acquired element to the SPSC
*
* This makes one element available to the consumer immediately
*
* @param spsc SPSC to produce the previously acquired element or do nothing
*/
#define rtio_spsc_produce(spsc) \
({ \
if ((spsc)->_spsc.acquire > 0) { \
(spsc)->_spsc.acquire -= 1; \
atomic_add(&(spsc)->_spsc.in, 1); \
} \
})
/**
* @brief Produce all previously acquired elements to the SPSC
*
* This makes all previous acquired elements available to the consumer
* immediately
*
* @param spsc SPSC to produce all previously acquired elements or do nothing
*/
#define rtio_spsc_produce_all(spsc) \
({ \
if ((spsc)->_spsc.acquire > 0) { \
unsigned long acquired = (spsc)->_spsc.acquire; \
(spsc)->_spsc.acquire = 0; \
atomic_add(&(spsc)->_spsc.in, acquired); \
} \
})
/**
* @brief Peek at an element from the spsc
*
* @param spsc Spsc to peek into
*
* @return Pointer to element or null if no consumable elements left
*/
#define rtio_spsc_peek(spsc) \
({ \
uint32_t idx = (uint32_t)atomic_get(&(spsc)->_spsc.out) + (spsc)->_spsc.consume; \
bool has_consumable = (idx != (uint32_t)atomic_get(&(spsc)->_spsc.in)); \
has_consumable ? &((spsc)->buffer[z_rtio_spsc_mask(spsc, idx)]) : NULL; \
})
/**
* @brief Consume an element from the spsc
*
* @param spsc Spsc to consume from
*
* @return Pointer to element or null if no consumable elements left
*/
#define rtio_spsc_consume(spsc) \
({ \
uint32_t idx = (uint32_t)atomic_get(&(spsc)->_spsc.out) + (spsc)->_spsc.consume; \
bool has_consumable = (idx != (uint32_t)atomic_get(&(spsc)->_spsc.in)); \
if (has_consumable) { \
(spsc)->_spsc.consume += 1; \
} \
has_consumable ? &((spsc)->buffer[z_rtio_spsc_mask(spsc, idx)]) : NULL; \
})
/**
* @brief Release a consumed element
*
* @param spsc SPSC to release consumed element or do nothing
*/
#define rtio_spsc_release(spsc) \
({ \
if ((spsc)->_spsc.consume > 0) { \
(spsc)->_spsc.consume -= 1; \
atomic_add(&(spsc)->_spsc.out, 1); \
} \
})
/**
* @brief Count of consumables in spsc
*
* @param spsc SPSC to get item count for
*/
#define rtio_spsc_consumable(spsc) \
({ \
(spsc)->_spsc.in \
- (spsc)->_spsc.out \
- (spsc)->_spsc.consume; \
}) \
/**
* @}
*/
#endif /* ZEPHYR_RTIO_SPSC_H_ */

View file

@ -31,3 +31,4 @@ add_subdirectory_ifdef(CONFIG_TIMING_FUNCTIONS timing)
add_subdirectory_ifdef(CONFIG_DEMAND_PAGING demand_paging)
add_subdirectory(modbus)
add_subdirectory(sd)
add_subdirectory(rtio)

View file

@ -68,4 +68,6 @@ source "subsys/tracing/Kconfig"
source "subsys/demand_paging/Kconfig"
source "subsys/rtio/Kconfig"
endmenu

View file

@ -0,0 +1,14 @@
# Copyright (c) 2022 Intel Corporation
# SPDX-License-Identifier: Apache-2.0
if(CONFIG_RTIO)
zephyr_library()
zephyr_include_directories(${ZEPHYR_BASE}/subsys/rtio)
zephyr_library_sources_ifdef(
CONFIG_RTIO_EXECUTOR_SIMPLE
rtio_executor_simple.c
)
endif()

39
subsys/rtio/Kconfig Normal file
View file

@ -0,0 +1,39 @@
# Copyright (c) 2022 Intel Corporation
# SPDX-License-Identifier: Apache-2.0
menuconfig RTIO
bool "RTIO"
if RTIO
config RTIO_EXECUTOR_SIMPLE
bool "A simple executor for RTIO"
default y
help
An simple RTIO executor that will execute a queue of requested I/O
operations as if they are a single chain of submission queue entries. This
does not support concurrent chains.
config RTIO_SUBMIT_SEM
bool "Use a semaphore when waiting for completions in rtio_submit"
help
When calling rtio_submit a semaphore is available to sleep the calling
thread for each completion queue event until the wait count is met. This
adds a small RAM overhead for a single semaphore. By default wait_for will
use polling on the completion queue with a k_yield() in between iterations.
config RTIO_CONSUME_SEM
bool "Use a semaphore when waiting for completions in rtio_cqe_consume_block"
help
When calling rtio_cqe_consume_block a semaphore is available to sleep the
calling thread for each completion queue event until the wait count is met.
This adds a small RAM overhead for a single semaphore. By default the call
will use polling on the completion queue with a k_yield() in between
iterations.
module = RTIO
module-str = RTIO
module-help = Sets log level for RTIO support
source "subsys/logging/Kconfig.template.log_config"
endif

View file

@ -0,0 +1,79 @@
/*
* Copyright (c) 2022 Intel Corporation.
*
* SPDX-License-Identifier: Apache-2.0
*/
#include <zephyr/rtio/rtio_executor_simple.h>
#include <zephyr/rtio/rtio.h>
#include <zephyr/kernel.h>
#include <zephyr/logging/log.h>
LOG_MODULE_REGISTER(rtio_executor_simple, CONFIG_RTIO_LOG_LEVEL);
/**
* @brief Submit submissions to simple executor
*
* The simple executor provides no concurrency instead
* execution each submission chain one after the next.
*
* @param r RTIO context
*
* @retval 0 Always succeeds
*/
int rtio_simple_submit(struct rtio *r)
{
/* TODO For each submission queue entry chain,
* submit the chain to the first iodev
*/
struct rtio_sqe *sqe = rtio_spsc_consume(r->sq);
if (sqe != NULL) {
LOG_DBG("Calling iodev submit");
rtio_iodev_submit(sqe, r);
}
return 0;
}
/**
* @brief Callback from an iodev describing success
*/
void rtio_simple_ok(struct rtio *r, const struct rtio_sqe *sqe, int result)
{
rtio_cqe_submit(r, result, sqe->userdata);
rtio_spsc_release(r->sq);
rtio_simple_submit(r);
}
/**
* @brief Callback from an iodev describing error
*/
void rtio_simple_err(struct rtio *r, const struct rtio_sqe *sqe, int result)
{
struct rtio_sqe *nsqe;
bool chained;
rtio_cqe_submit(r, result, sqe->userdata);
chained = sqe->flags & RTIO_SQE_CHAINED;
rtio_spsc_release(r->sq);
if (chained) {
nsqe = rtio_spsc_consume(r->sq);
while (nsqe != NULL && nsqe->flags & RTIO_SQE_CHAINED) {
rtio_cqe_submit(r, -ECANCELED, nsqe->userdata);
rtio_spsc_release(r->sq);
nsqe = rtio_spsc_consume(r->sq);
}
if (nsqe != NULL) {
rtio_iodev_submit(nsqe, r);
}
} else {
/* Now we can submit the next in the queue if we aren't done */
rtio_simple_submit(r);
}
}

View file

@ -0,0 +1,14 @@
# Copyright (c) 2021 Intel Corporation.
# SPDX-License-Identifier: Apache-2.0
cmake_minimum_required(VERSION 3.20.0)
find_package(Zephyr REQUIRED HINTS $ENV{ZEPHYR_BASE})
project(rtio_api_test)
FILE(GLOB app_sources src/*.c)
target_sources(app PRIVATE ${app_sources})
target_include_directories(app PRIVATE
${ZEPHYR_BASE}/include
${ZEPHYR_BASE}/kernel/include
${ZEPHYR_BASE}/arch/${ARCH}/include)

View file

@ -0,0 +1,3 @@
CONFIG_ZTEST=y
CONFIG_LOG=y
CONFIG_RTIO=y

View file

@ -0,0 +1,389 @@
/*
* Copyright (c) 2021 Intel Corporation.
*
* SPDX-License-Identifier: Apache-2.0
*/
#include <errno.h>
#include <ztest.h>
#include <zephyr/kernel.h>
#include <zephyr/sys/atomic.h>
#include <zephyr/rtio/rtio_spsc.h>
#include <zephyr/rtio/rtio.h>
#include <zephyr/rtio/rtio_executor_simple.h>
#include "rtio_iodev_test.h"
/*
* @brief Produce and Consume a single uint32_t in the same execution context
*
* @see rtio_spsc_acquire(), rtio_spsc_produce(), rtio_spsc_consume(), rtio_spsc_release()
*
* @ingroup rtio_tests
*/
void test_produce_consume_size1(void)
{
RTIO_SPSC_DEFINE(ezspsc, uint32_t, 1);
const uint32_t magic = 43219876;
uint32_t *acq = rtio_spsc_acquire(&ezspsc);
zassert_not_null(acq, "Acquire should succeed");
*acq = magic;
uint32_t *acq2 = rtio_spsc_acquire(&ezspsc);
zassert_is_null(acq2, "Acquire should fail");
uint32_t *cons = rtio_spsc_consume(&ezspsc);
zassert_is_null(cons, "Consume should fail");
zassert_equal(rtio_spsc_consumable(&ezspsc), 0, "Consumables should be 0");
rtio_spsc_produce(&ezspsc);
zassert_equal(rtio_spsc_consumable(&ezspsc), 1, "Consumables should be 1");
uint32_t *cons2 = rtio_spsc_consume(&ezspsc);
zassert_equal(rtio_spsc_consumable(&ezspsc), 0, "Consumables should be 0");
zassert_not_null(cons2, "Consume should not fail");
zassert_equal(*cons2, magic, "Consume value should equal magic");
uint32_t *cons3 = rtio_spsc_consume(&ezspsc);
zassert_is_null(cons3, "Consume should fail");
uint32_t *acq3 = rtio_spsc_acquire(&ezspsc);
zassert_is_null(acq3, "Acquire should not succeed");
rtio_spsc_release(&ezspsc);
uint32_t *acq4 = rtio_spsc_acquire(&ezspsc);
zassert_not_null(acq4, "Acquire should succeed");
}
/*&*
* @brief Produce and Consume 3 items at a time in a spsc of size 4 to validate masking
* and wrap around reads/writes.
*
* @see rtio_spsc_acquire(), rtio_spsc_produce(), rtio_spsc_consume(), rtio_spsc_release()
*
* @ingroup rtio_tests
*/
void test_produce_consume_wrap_around(void)
{
RTIO_SPSC_DEFINE(ezspsc, uint32_t, 4);
for (int i = 0; i < 10; i++) {
zassert_equal(rtio_spsc_consumable(&ezspsc), 0, "Consumables should be 0");
for (int j = 0; j < 3; j++) {
uint32_t *entry = rtio_spsc_acquire(&ezspsc);
zassert_not_null(entry, "Acquire should succeed");
*entry = i * 3 + j;
rtio_spsc_produce(&ezspsc);
}
zassert_equal(rtio_spsc_consumable(&ezspsc), 3, "Consumables should be 3");
for (int k = 0; k < 3; k++) {
uint32_t *entry = rtio_spsc_consume(&ezspsc);
zassert_not_null(entry, "Consume should succeed");
zassert_equal(*entry, i * 3 + k, "Consume value should equal i*3+k");
rtio_spsc_release(&ezspsc);
}
zassert_equal(rtio_spsc_consumable(&ezspsc), 0, "Consumables should be 0");
}
}
/**
* @brief Ensure that integer wraps continue to work.
*
* Done by setting all values to UINTPTR_MAX - 2 and writing and reading enough
* to ensure integer wraps occur.
*/
void test_int_wrap_around(void)
{
RTIO_SPSC_DEFINE(ezspsc, uint32_t, 4);
ezspsc._spsc.in = ATOMIC_INIT(UINTPTR_MAX - 2);
ezspsc._spsc.out = ATOMIC_INIT(UINTPTR_MAX - 2);
for (int j = 0; j < 3; j++) {
uint32_t *entry = rtio_spsc_acquire(&ezspsc);
zassert_not_null(entry, "Acquire should succeed");
*entry = j;
rtio_spsc_produce(&ezspsc);
}
zassert_equal(atomic_get(&ezspsc._spsc.in), UINTPTR_MAX + 1, "Spsc in should wrap");
for (int k = 0; k < 3; k++) {
uint32_t *entry = rtio_spsc_consume(&ezspsc);
zassert_not_null(entry, "Consume should succeed");
zassert_equal(*entry, k, "Consume value should equal i*3+k");
rtio_spsc_release(&ezspsc);
}
zassert_equal(atomic_get(&ezspsc._spsc.out), UINTPTR_MAX + 1, "Spsc out should wrap");
}
#define MAX_RETRIES 5
#define SMP_ITERATIONS 100
RTIO_SPSC_DEFINE(spsc, uint32_t, 4);
static void t1_consume(void *p1, void *p2, void *p3)
{
struct rtio_spsc_spsc *ezspsc = p1;
uint32_t retries = 0;
uint32_t *val = NULL;
for (int i = 0; i < SMP_ITERATIONS; i++) {
val = NULL;
retries = 0;
while (val == NULL && retries < MAX_RETRIES) {
val = rtio_spsc_consume(ezspsc);
retries++;
}
if (val != NULL) {
rtio_spsc_release(ezspsc);
} else {
printk("consumer yield\n");
k_yield();
}
}
}
static void t2_produce(void *p1, void *p2, void *p3)
{
struct rtio_spsc_spsc *ezspsc = p1;
uint32_t retries = 0;
uint32_t *val = NULL;
for (int i = 0; i < SMP_ITERATIONS; i++) {
val = NULL;
retries = 0;
printk("producer acquiring\n");
while (val == NULL && retries < MAX_RETRIES) {
val = rtio_spsc_acquire(ezspsc);
retries++;
}
if (val != NULL) {
*val = SMP_ITERATIONS;
rtio_spsc_produce(ezspsc);
} else {
printk("producer yield\n");
k_yield();
}
}
}
#define STACK_SIZE (384 + CONFIG_TEST_EXTRA_STACK_SIZE)
#define THREADS_NUM 2
struct thread_info {
k_tid_t tid;
int executed;
int priority;
int cpu_id;
};
static struct thread_info tinfo[THREADS_NUM];
static struct k_thread tthread[THREADS_NUM];
static K_THREAD_STACK_ARRAY_DEFINE(tstack, THREADS_NUM, STACK_SIZE);
/**
* @brief Test that the producer and consumer are indeed thread safe
*
* This can and should be validated on SMP machines where incoherent
* memory could cause issues.
*/
void test_spsc_threaded(void)
{
tinfo[0].tid =
k_thread_create(&tthread[0], tstack[0], STACK_SIZE,
(k_thread_entry_t)t1_consume,
&spsc, NULL, NULL,
K_PRIO_PREEMPT(5),
K_INHERIT_PERMS, K_NO_WAIT);
tinfo[1].tid =
k_thread_create(&tthread[1], tstack[1], STACK_SIZE,
(k_thread_entry_t)t2_produce,
&spsc, NULL, NULL,
K_PRIO_PREEMPT(5),
K_INHERIT_PERMS, K_NO_WAIT);
k_thread_join(tinfo[1].tid, K_FOREVER);
k_thread_join(tinfo[0].tid, K_FOREVER);
}
RTIO_EXECUTOR_SIMPLE_DEFINE(simple_exec);
RTIO_DEFINE(r_simple, (struct rtio_executor *)&simple_exec, 4, 4);
struct rtio_iodev_test iodev_test_simple;
/**
* @brief Test the basics of the RTIO API
*
* Ensures that we can setup an RTIO context, enqueue a request, and receive
* a completion event.
*/
void test_rtio_simple(void)
{
int res;
uintptr_t userdata[2] = {0, 1};
struct rtio_sqe *sqe;
struct rtio_cqe *cqe;
rtio_iodev_test_init(&iodev_test_simple);
TC_PRINT("setting up single no-op\n");
sqe = rtio_spsc_acquire(r_simple.sq);
zassert_not_null(sqe, "Expected a valid sqe");
rtio_sqe_prep_nop(sqe, (struct rtio_iodev *)&iodev_test_simple, &userdata[0]);
TC_PRINT("submit with wait\n");
res = rtio_submit(&r_simple, 1);
zassert_ok(res, "Should return ok from rtio_execute");
cqe = rtio_spsc_consume(r_simple.cq);
zassert_not_null(cqe, "Expected a valid cqe");
zassert_ok(cqe->result, "Result should be ok");
zassert_equal_ptr(cqe->userdata, &userdata[0], "Expected userdata back");
rtio_spsc_release(r_simple.cq);
}
RTIO_EXECUTOR_SIMPLE_DEFINE(chain_exec);
RTIO_DEFINE(r_chain, (struct rtio_executor *)&chain_exec, 4, 4);
struct rtio_iodev_test iodev_test_chain[2];
/**
* @brief Test chained requests
*
* Ensures that we can setup an RTIO context, enqueue a chained requests,
* and receive completion events in the correct order given the chained
* flag and multiple devices where serialization isn't guaranteed.
*/
void test_rtio_chain(void)
{
int res;
uintptr_t userdata[4] = {0, 1, 2, 3};
struct rtio_sqe *sqe;
struct rtio_cqe *cqe;
for (int i = 0; i < 2; i++) {
rtio_iodev_test_init(&iodev_test_chain[i]);
}
for (int i = 0; i < 4; i++) {
sqe = rtio_spsc_acquire(r_chain.sq);
zassert_not_null(sqe, "Expected a valid sqe");
rtio_sqe_prep_nop(sqe, (struct rtio_iodev *)&iodev_test_chain[i % 2],
&userdata[i]);
sqe->flags |= RTIO_SQE_CHAINED;
}
res = rtio_submit(&r_chain, 4);
zassert_ok(res, "Should return ok from rtio_execute");
zassert_equal(rtio_spsc_consumable(r_chain.cq), 4, "Should have 4 pending completions");
for (int i = 0; i < 4; i++) {
TC_PRINT("consume %d\n", i);
cqe = rtio_spsc_consume(r_chain.cq);
zassert_not_null(cqe, "Expected a valid cqe");
zassert_ok(cqe->result, "Result should be ok");
zassert_equal_ptr(cqe->userdata, &userdata[i], "Expected in order completions");
rtio_spsc_release(r_chain.cq);
}
}
RTIO_EXECUTOR_SIMPLE_DEFINE(multi_exec);
RTIO_DEFINE(r_multi, (struct rtio_executor *)&multi_exec, 4, 4);
struct rtio_iodev_test iodev_test_multi[2];
/**
* @brief Test multiple asynchronous chains against one iodev
*/
void test_rtio_multiple_chains(void)
{
int res;
uintptr_t userdata[4] = {0, 1, 2, 3};
struct rtio_sqe *sqe;
struct rtio_cqe *cqe;
for (int i = 0; i < 2; i++) {
rtio_iodev_test_init(&iodev_test_multi[i]);
}
for (int i = 0; i < 2; i++) {
for (int j = 0; j < 2; j++) {
sqe = rtio_spsc_acquire(r_multi.sq);
zassert_not_null(sqe, "Expected a valid sqe");
rtio_sqe_prep_nop(sqe, (struct rtio_iodev *)&iodev_test_multi[i],
(void *)userdata[i*2 + j]);
if (j == 0) {
sqe->flags |= RTIO_SQE_CHAINED;
} else {
sqe->flags |= 0;
}
}
}
TC_PRINT("calling submit from test case\n");
res = rtio_submit(&r_multi, 0);
zassert_ok(res, "Should return ok from rtio_execute");
bool seen[4] = { 0 };
TC_PRINT("waiting for 4 completions\n");
for (int i = 0; i < 4; i++) {
TC_PRINT("waiting on completion %d\n", i);
cqe = rtio_spsc_consume(r_multi.cq);
while (cqe == NULL) {
k_sleep(K_MSEC(1));
cqe = rtio_spsc_consume(r_multi.cq);
}
zassert_not_null(cqe, "Expected a valid cqe");
TC_PRINT("result %d, would block is %d, inval is %d\n",
cqe->result, -EWOULDBLOCK, -EINVAL);
zassert_ok(cqe->result, "Result should be ok");
seen[(uintptr_t)cqe->userdata] = true;
if (seen[1]) {
zassert_true(seen[0], "Should see 0 before 1");
}
if (seen[3]) {
zassert_true(seen[2], "Should see 2 before 3");
}
rtio_spsc_release(r_multi.cq);
}
}
void test_main(void)
{
ztest_test_suite(rtio_spsc_test,
ztest_1cpu_unit_test(test_produce_consume_size1),
ztest_1cpu_unit_test(test_produce_consume_wrap_around),
ztest_1cpu_unit_test(test_int_wrap_around),
ztest_unit_test(test_spsc_threaded),
ztest_unit_test(test_rtio_simple),
ztest_unit_test(test_rtio_chain),
ztest_unit_test(test_rtio_multiple_chains)
);
ztest_run_test_suite(rtio_spsc_test);
}

View file

@ -0,0 +1,68 @@
/*
* Copyright (c) 2022 Intel Corporation.
*
* SPDX-License-Identifier: Apache-2.0
*/
#include <zephyr/rtio/rtio.h>
#include <zephyr/kernel.h>
#include "rtio_iodev_test.h"
static void rtio_iodev_timer_fn(struct k_timer *tm)
{
struct rtio_iodev_test *iodev = CONTAINER_OF(tm, struct rtio_iodev_test, timer);
struct rtio *r = iodev->r;
const struct rtio_sqe *sqe = iodev->sqe;
iodev->r = NULL;
iodev->sqe = NULL;
/* Complete the request with Ok and a result */
printk("sqe ok callback\n");
rtio_sqe_ok(r, sqe, 0);
}
static void rtio_iodev_test_submit(const struct rtio_sqe *sqe, struct rtio *r)
{
struct rtio_iodev_test *iodev = (struct rtio_iodev_test *)sqe->iodev;
/**
* TODO this isn't quite right, probably should be equivalent to a
* pend instead of a fail here. This submission chain on this iodev
* needs to wait until the iodev is available again, which should be
* checked after each sqe using the iodev completes. A smart
* executor then should have, much like a thread scheduler, a pend
* list that it checks against on each completion.
*/
if (k_timer_remaining_get(&iodev->timer) != 0) {
printk("would block, timer not free!\n");
rtio_sqe_err(r, sqe, -EWOULDBLOCK);
return;
}
iodev->sqe = sqe;
iodev->r = r;
/**
* Simulate an async hardware request with a one shot timer
*
* In reality the time to complete might have some significant variance
* but this is proof enough of a working API flow.
*
* TODO enable setting this the time here in some way
*/
printk("starting one shot\n");
k_timer_start(&iodev->timer, K_MSEC(10), K_NO_WAIT);
}
const struct rtio_iodev_api rtio_iodev_test_api = {
.submit = rtio_iodev_test_submit,
};
void rtio_iodev_test_init(struct rtio_iodev_test *test)
{
test->iodev.api = &rtio_iodev_test_api;
k_timer_init(&test->timer, rtio_iodev_timer_fn, NULL);
}

View file

@ -0,0 +1,40 @@
/*
* Copyright (c) 2022 Intel Corporation.
*
* SPDX-License-Identifier: Apache-2.0
*/
#include <zephyr/rtio/rtio.h>
#include <zephyr/kernel.h>
#ifndef RTIO_IODEV_TEST_H_
#define RTIO_IODEV_TEST_H_
/*
* @brief A simple asynchronous testable iodev
*/
struct rtio_iodev_test {
/**
* io device struct as the first member, makes this an rtio_iodev
*/
struct rtio_iodev iodev;
/**
* k_timer for an asynchronous task
*/
struct k_timer timer;
/**
* Currently executing sqe
*/
const struct rtio_sqe *sqe;
/**
* Currently executing rtio context
*/
struct rtio *r;
};
void rtio_iodev_test_init(struct rtio_iodev_test *test);
#endif /* RTIO_IODEV_TEST_H_ */

View file

@ -0,0 +1,7 @@
tests:
subsys.rtio.api:
tags: rtio
subsys.rtio.api.submit_sem:
tags: rtio
extra_configs:
- CONFIG_RTIO_SUBMIT_SEM=y