samples: rtio: Add sensor batch processing sample application

Adds a new sample application that demonstrates using the RTIO subsystem
to read periodic sensor data directly into buffers allocated by the
application, asynchronously process batches of data with an algorithm,
and recycle buffers back for reading additional sensor data.

The sensor iodev in this application is an timer-driven device that
executes one read request per timer period. It doesn't actually send any
transactions to a real I2C/SPI bus or read any real data into the
application-provided buffers. This timer-driven behavior mimics how a
real sensor periodically triggers a GPIO interrupt when new data is
ready.

The sensor iodev currently uses an internal message queue to store
pending requests from the time they are submitted until the next timer
expiration. At least one pending request needs to be stored by the iodev
to ensure that it has a buffer available to read data into. However,
any more than that should probably be handled by the application, since
it's the application that determines how often it can submit new
requests and therefore how deep the queue needs to be.

The sensor iodev is implemented to support multiple instances with
devicetree, but additional work remains to enable and use more than one
in the application.

Tested on native_posix and frdm_k64f.

Signed-off-by: Maureen Helm <maureen.helm@intel.com>
This commit is contained in:
Maureen Helm 2022-04-26 16:55:47 -05:00 committed by Anas Nashif
commit fd204f31d4
7 changed files with 347 additions and 0 deletions

View file

@ -0,0 +1,11 @@
# Copyright (c) 2022 Intel Corporation
#
# SPDX-License-Identifier: Apache-2.0
cmake_minimum_required(VERSION 3.20.0)
find_package(Zephyr REQUIRED HINTS $ENV{ZEPHYR_BASE})
project(rtio_sensor_batch_processing)
FILE(GLOB app_sources src/*.c)
target_sources(app PRIVATE ${app_sources})

View file

@ -0,0 +1,32 @@
/*
* Copyright (c) 2022 Intel Corporation
*
* SPDX-License-Identifier: Apache-2.0
*/
/ {
app {
#address-cells = <1>;
#size-cells = <0>;
vsensor0: sensor@0 {
compatible = "vnd,sensor";
reg = <0>;
label = "SENSOR_0";
sample-period = <100>;
sample-size = <16>;
max-msgs = <8>;
status = "okay";
};
vsensor1: sensor@1 {
compatible = "vnd,sensor";
reg = <1>;
label = "SENSOR_1";
sample-period = <120>;
sample-size = <16>;
max-msgs = <4>;
status = "disabled";
};
};
};

View file

@ -0,0 +1,22 @@
# Copyright (c) 2022 Intel Corporation
#
# SPDX-License-Identifier: Apache-2.0
description: Sensor
include: base.yaml
compatible: "vnd,sensor"
properties:
sample-period:
type: int
required: true
sample-size:
type: int
required: true
max-msgs:
type: int
required: true

View file

@ -0,0 +1,4 @@
CONFIG_LOG=y
CONFIG_LOG_MODE_MINIMAL=y
CONFIG_LOG_DEFAULT_LEVEL=4
CONFIG_RTIO=y

View file

@ -0,0 +1,14 @@
sample:
name: RTIO sample
tests:
sample.rtio.sensor_batch_processing:
tags: rtio
integration_platforms:
- native_posix
harness: console
harness_config:
type: multi_line
regex:
- "(.*)Submitting (.*) read requests"
- "(.*)Start processing (.*) samples"
- "(.*)Finished processing (.*) samples"

View file

@ -0,0 +1,98 @@
/*
* Copyright (c) 2022 Intel Corporation
*
* SPDX-License-Identifier: Apache-2.0
*/
#include <zephyr.h>
#include <zephyr/rtio/rtio.h>
#include <zephyr/rtio/rtio_executor_simple.h>
#include <zephyr/logging/log.h>
LOG_MODULE_REGISTER(main);
#define N (8)
#define M (N/2)
#define SQ_SZ (N)
#define CQ_SZ (N)
#define NODE_ID DT_INST(0, vnd_sensor)
#define SAMPLE_PERIOD DT_PROP(NODE_ID, sample_period)
#define SAMPLE_SIZE DT_PROP(NODE_ID, sample_size)
#define PROCESS_TIME ((M - 1) * SAMPLE_PERIOD)
RTIO_EXECUTOR_SIMPLE_DEFINE(simple_exec);
RTIO_DEFINE(ez_io, (struct rtio_executor *)&simple_exec, SQ_SZ, CQ_SZ);
static uint8_t bufs[N][SAMPLE_SIZE];
void main(void)
{
const struct device *vnd_sensor = DEVICE_DT_GET(NODE_ID);
struct rtio_iodev *iodev = vnd_sensor->data;
/* Fill the entire submission queue. */
for (int n = 0; n < N; n++) {
struct rtio_sqe *sqe = rtio_spsc_acquire(ez_io.sq);
rtio_sqe_prep_read(sqe, iodev, RTIO_PRIO_HIGH, bufs[n],
SAMPLE_SIZE, bufs[n]);
rtio_spsc_produce(ez_io.sq);
}
while (true) {
int m = 0;
uint8_t *userdata[M];
LOG_INF("Submitting %d read requests", M);
rtio_submit(&ez_io, M);
/* Consume completion events until there is enough sensor data
* available to execute a batch processing algorithm, such as
* an FFT.
*/
while (m < M) {
struct rtio_cqe *cqe = rtio_spsc_consume(ez_io.cq);
if (cqe == NULL) {
LOG_DBG("No completion events available");
k_msleep(SAMPLE_PERIOD);
continue;
}
LOG_DBG("Consumed completion event %d", m);
if (cqe->result < 0) {
LOG_ERR("Operation failed");
}
userdata[m] = cqe->userdata;
rtio_spsc_release(ez_io.cq);
m++;
}
/* Here is where we would execute a batch processing algorithm.
* Model as a long sleep that takes multiple sensor sample
* periods. The sensor driver can continue reading new data
* during this time because we submitted more buffers into the
* queue than we needed for the batch processing algorithm.
*/
LOG_INF("Start processing %d samples", M);
for (m = 0; m < M; m++) {
LOG_HEXDUMP_DBG(userdata[m], SAMPLE_SIZE, "Sample data:");
}
k_msleep(PROCESS_TIME);
LOG_INF("Finished processing %d samples", M);
/* Recycle the sensor data buffers and refill the submission
* queue.
*/
for (m = 0; m < M; m++) {
struct rtio_sqe *sqe = rtio_spsc_acquire(ez_io.sq);
rtio_sqe_prep_read(sqe, iodev, RTIO_PRIO_HIGH,
userdata[m], SAMPLE_SIZE,
userdata[m]);
rtio_spsc_produce(ez_io.sq);
}
}
}

View file

@ -0,0 +1,166 @@
/*
* Copyright (c) 2022 Intel Corporation
*
* SPDX-License-Identifier: Apache-2.0
*/
#define DT_DRV_COMPAT vnd_sensor
#include <zephyr/zephyr.h>
#include <zephyr/rtio/rtio.h>
#include <zephyr/logging/log.h>
#include <zephyr/sys/util.h>
LOG_MODULE_REGISTER(vnd_sensor);
struct vnd_sensor_msg {
struct rtio_sqe sqe;
struct rtio *r;
};
struct vnd_sensor_config {
uint32_t sample_period;
size_t sample_size;
struct vnd_sensor_msg *msgs;
size_t max_msgs;
};
struct vnd_sensor_data {
struct rtio_iodev iodev;
struct k_timer timer;
const struct device *dev;
struct k_msgq msgq;
uint32_t sample_number;
};
static int vnd_sensor_iodev_read(const struct device *dev, uint8_t *buf,
uint32_t buf_len)
{
const struct vnd_sensor_config *config = dev->config;
struct vnd_sensor_data *data = dev->data;
uint32_t sample_number;
uint32_t key;
LOG_DBG("%s: buf_len = %d, buf = %p", dev->name, buf_len, buf);
key = irq_lock();
sample_number = data->sample_number++;
irq_unlock(key);
if (buf_len < config->sample_size) {
LOG_ERR("%s: Buffer is too small", dev->name);
return -ENOMEM;
}
for (int i = 0; i < MIN(config->sample_size, buf_len); i++) {
buf[i] = sample_number * config->sample_size + i;
}
return 0;
}
static void vnd_sensor_iodev_execute(const struct device *dev,
const struct rtio_sqe *sqe, struct rtio *r)
{
int result;
if (sqe->op == RTIO_OP_RX) {
result = vnd_sensor_iodev_read(dev, sqe->buf, sqe->buf_len);
} else {
LOG_ERR("%s: Invalid op", dev->name);
result = -EINVAL;
}
if (result < 0) {
rtio_sqe_err(r, sqe, result);
} else {
rtio_sqe_ok(r, sqe, result);
}
}
static void vnd_sensor_iodev_submit(const struct rtio_sqe *sqe, struct rtio *r)
{
struct vnd_sensor_data *data = (struct vnd_sensor_data *) sqe->iodev;
const struct device *dev = data->dev;
struct vnd_sensor_msg msg = {
.sqe = *sqe,
.r = r,
};
if (k_msgq_put(&data->msgq, &msg, K_NO_WAIT) != 0) {
LOG_ERR("%s: Could not put a msg", dev->name);
rtio_sqe_err(r, sqe, -EWOULDBLOCK);
}
}
static void vnd_sensor_handle_int(const struct device *dev)
{
struct vnd_sensor_data *data = dev->data;
struct vnd_sensor_msg msg;
if (k_msgq_get(&data->msgq, &msg, K_NO_WAIT) != 0) {
LOG_ERR("%s: Could not get a msg", dev->name);
} else {
vnd_sensor_iodev_execute(dev, &msg.sqe, msg.r);
}
}
static void vnd_sensor_timer_expiry(struct k_timer *timer)
{
struct vnd_sensor_data *data =
CONTAINER_OF(timer, struct vnd_sensor_data, timer);
vnd_sensor_handle_int(data->dev);
}
static int vnd_sensor_init(const struct device *dev)
{
const struct vnd_sensor_config *config = dev->config;
struct vnd_sensor_data *data = dev->data;
uint32_t sample_period = config->sample_period;
data->dev = dev;
k_msgq_init(&data->msgq, (char *) config->msgs,
sizeof(struct vnd_sensor_msg), config->max_msgs);
k_timer_init(&data->timer, vnd_sensor_timer_expiry, NULL);
k_timer_start(&data->timer, K_MSEC(sample_period),
K_MSEC(sample_period));
return 0;
}
static const struct rtio_iodev_api vnd_sensor_iodev_api = {
.submit = vnd_sensor_iodev_submit,
};
#define VND_SENSOR_INIT(n) \
static struct vnd_sensor_msg \
vnd_sensor_msgs_##n[DT_INST_PROP(n, max_msgs)]; \
\
static const struct vnd_sensor_config vnd_sensor_config_##n = { \
.sample_period = DT_INST_PROP(n, sample_period), \
.sample_size = DT_INST_PROP(n, sample_size), \
.msgs = vnd_sensor_msgs_##n, \
.max_msgs = DT_INST_PROP(n, max_msgs), \
}; \
\
static struct vnd_sensor_data vnd_sensor_data_##n = { \
.iodev = { \
.api = &vnd_sensor_iodev_api, \
}, \
}; \
\
DEVICE_DT_INST_DEFINE(n, \
vnd_sensor_init, \
NULL, \
&vnd_sensor_data_##n, \
&vnd_sensor_config_##n, \
POST_KERNEL, \
CONFIG_KERNEL_INIT_PRIORITY_DEVICE, \
NULL);
DT_INST_FOREACH_STATUS_OKAY(VND_SENSOR_INIT)