nanokernel: add optional ring buffer data type

This patch is based on some code contributed by Dirk Brandewie.

This is a generic data structure for queuing data in a FIFO ring
buffer. Any given data enqueued is annotated with an app-specific
type identifier, and small integral value. Use of a data pointer
is optional if the necessary information can be conveyed in the
annotations. We want all the metadata to fit in a single DWORD.
The ring buffer always contains at least one free dword in the
buffer to correctly distinguish between full and empty queues.

Concurrency control is almost nonexistent; depending on usage,
apps may want to introduce the usage of semaphores and/or mutexes
to preserve the integrity of the ring buffer or provide notifications
when data is available.

Change-Id: I860262d2afc96db4476d4c695a92f7da355ab732
Signed-off-by: Andrew Boie <andrew.p.boie@intel.com>
This commit is contained in:
Andrew Boie 2015-08-27 13:07:36 -07:00 committed by Anas Nashif
commit f18f102feb
11 changed files with 586 additions and 0 deletions

View file

@ -12,3 +12,4 @@ provided by the nanokernel.
nanokernel_fifos
nanokernel_lifos
nanokernel_stacks
nanokernel_ring_buffers

View file

@ -0,0 +1,143 @@
.. _nanokernel_ring_buffers:
Nanokernel Ring Buffers
#######################
Definition
**********
The ring buffer is defined in :file:`include/misc/ring_buffer.h` and
:file:`kernel/nanokernel/ring_buffer.c`. This is an array-based
circular buffer, stored in first-in-first-out order. The APIs allow
for enqueueing and retrieval of chunks of data up to 1024 bytes in size,
along with two metadata values (type ID and an app-specific integer).
Unlike nanokernel FIFOs, storage of enqueued items and their metadata
is managed in a fixed buffer and there are no preconditions on the data
enqueued (other than the size limit). Since the size annotation is only
an 8-bit value, sizes are expressed in terms of 32-bit chunks.
Internally, the ring buffer always maintains an empty 32-bit block in the
buffer to distinguish between empty and full buffers. Any given entry
in the buffer will use a 32-bit block for metadata plus any data attached.
If the size of the buffer array is a power of two, the ring buffer will
use more efficient masking instead of expensive modulo operations to
maintain itself.
Concurrency
***********
Concurrency control of ring buffers is not implemented at this level.
Depending on usage (particularly with respect to number of concurrent
readers/writers) applications may need to protect the ring buffer with
mutexes and/or use semaphores to notify consumers that there is data to
read.
For the trivial case of one producer and one consumer, concurrency
shouldn't be needed.
Example: Initializing a Ring Buffer
===================================
There are three ways to initialize a ring buffer. The first two are through use
of macros which defines one (and an associated private buffer) in file scope.
You can declare a fast ring buffer that uses mask operations by declaring
a power-of-two sized buffer:
.. code-block:: c
/* Buffer with 2^8 or 256 elements */
SYS_RING_BUF_DECLARE_POW2(my_ring_buf, 8);
Arbitrary-sized buffers may also be declared with a different macro, but
these will always be slower due to use of modulo operations:
.. code-block:: c
#define MY_RING_BUF_SIZE 93
SYS_RING_BUF_DECLARE_SIZE(my_ring_buf, MY_RING_BUF_SIZE);
Alternatively, a ring buffer may be initialized manually. Whether the buffer
will use modulo or mask operations will be detected automatically:
.. code-block:: c
#define MY_RING_BUF_SIZE 64
struct my_struct {
struct ring_buffer rb;
uint32_t buffer[MY_RING_BUF_SIZE];
...
};
struct my_struct ms;
void init_my_struct {
sys_ring_buf_init(&ms.rb, sizeof(ms.buffer), ms.buffer);
...
}
Example: Enqueuing data
=======================
.. code-block:: c
int ret;
ret = sys_ring_buf_put(&ring_buf, TYPE_FOO, 0, &my_foo, SIZE32_OF(my_foo));
if (ret == -EMSGSIZE) {
... not enough room for the message ..
}
If the type or value fields are sufficient, the data pointer and size may be 0.
.. code-block:: c
int ret;
ret = sys_ring_buf_put(&ring_buf, TYPE_BAR, 17, NULL, 0);
if (ret == -EMSGSIZE) {
... not enough room for the message ..
}
Example: Retrieving data
========================
.. code-block:: c
int ret;
uint32_t data[6];
size = SIZE32_OF(data);
ret = sys_ring_buf_get(&ring_buf, &type, &value, data, &size);
if (ret == -EMSGSIZE) {
printk("Buffer is too small, need %d uint32_t\n", size);
} else if (ret == -EAGAIN) {
printk("Ring buffer is empty\n");
} else {
printk("got item of type %u value &u of size %u dwords\n",
type, value, size);
...
}
APIs
****
The following APIs for ring buffers are provided by :file:`ring_buffer.h`.
+------------------------------------------------+------------------------------------+
| Call | Description |
+================================================+====================================+
| :c:func:`sys_ring_buf_init()` | Initialize a ring buffer. |
+------------------------------------------------+------------------------------------+
| :c:func:`SYS_RING_BUF_DECLARE_POW2()` | Declare and init a file-scope |
| :c:func:`SYS_RING_BUF_DECLARE_SIZE()` | ring buffer. |
+------------------------------------------------+------------------------------------+
| :c:func:`sys_ring_buf_get_space()` | Return the amount of free buffer |
| | storage space in 32-bit dwords |
+------------------------------------------------+------------------------------------+
| :c:func:`sys_ring_buf_is_empty()` | Indicate whether a buffer is empty |
+------------------------------------------------+------------------------------------+
| :c:func:`sys_ring_buf_put()` | Enqueue an item |
+------------------------------------------------+------------------------------------+
| :c:func:`sys_ring_buf_get()` | De-queue an item |
+------------------------------------------------+------------------------------------+

178
include/misc/ring_buffer.h Normal file
View file

@ -0,0 +1,178 @@
/* ring_buffer.h: Simple ring buffer API */
/*
* Copyright (c) 2015 Intel Corporation
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1) Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
*
* 2) Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* 3) Neither the name of Wind River Systems nor the names of its contributors
* may be used to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
/** @file */
#ifndef __RING_BUFFER_H__
#define __RING_BUFFER_H__
#include <nanokernel.h>
#include <misc/util.h>
#include <errno.h>
#define SIZE32_OF(x) (sizeof((x))/sizeof(uint32_t))
/**
* @brief A structure to represent a ring buffer
*/
struct ring_buf {
uint32_t head; /**< Index in buf for the head element */
uint32_t tail; /**< Index in buf for the tail element */
uint32_t dropped_put_count; /**< Running tally of the number of failed
* put attempts */
uint32_t size; /**< Size of buf in 32-bit chunks */
uint32_t *buf; /**< Memory region for stored entries */
uint32_t mask; /**< Modulo mask if size is a power of 2 */
};
/**
* @brief Declare a power-of-two sized ring buffer
*
* Use of this macro is preferred over SYS_RING_BUF_DECLARE_SIZE() as it
* will not need to use expensive modulo operations.
*
* @param name File-scoped name of the ring buffer to declare
* @param pow Create a buffer of 2^pow 32-bit elements */
#define SYS_RING_BUF_DECLARE_POW2(name, pow) \
static uint32_t _ring_buffer_data_##name[1 << (pow)]; \
struct ring_buf name = { \
.size = (1 << (pow)), \
.mask = (1 << (pow)) - 1, \
.buf = _ring_buffer_data_##name \
};
/**
* @brief Declare an arbitrary sized ring buffer
*
* A ring buffer declared in this way has more flexibility on buffer size
* but will use more expensive modulo operations to maintain itself.
*
* @param name File-scoped name of the ring buffer to declare
* @param size32 Size of buffer in 32-bit elements */
#define SYS_RING_BUF_DECLARE_SIZE(name, size32) \
static uint32_t _ring_buffer_data_##name[size32]; \
struct ring_buf name = { \
.size = size32, \
.buf = _ring_buffer_data_##name \
};
/**
* @brief Initialize a ring buffer, in cases where DECLARE_RING_BUF_STATIC
* isn't used.
*
* For optimal performance, use size values that are a power of 2 as they
* don't require expensive modulo operations when maintaining the buffer.
*
* @param buf Ring buffer to initialize
* @param size Size of the provided buffer in 32-bit chunks
* @param data Data area for the ring buffer, typically
* uint32_t data[size]
*/
static inline void sys_ring_buf_init(struct ring_buf *buf, uint32_t size,
uint32_t *data)
{
buf->head = 0;
buf->tail = 0;
buf->dropped_put_count = 0;
buf->size = size;
buf->buf = data;
if (is_power_of_two(size)) {
buf->mask = size - 1;
} else {
buf->mask = 0;
}
}
/**
* @brief Determine if a ring buffer is empty
*
* @return nonzero if the buffer is empty */
static inline int sys_ring_buf_is_empty(struct ring_buf *buf)
{
return (buf->head == buf->tail);
}
/**
* @brief Obtain available space in a ring buffer
*
* @param buf Ring buffer to examine
* @return Available space in the buffer in 32-bit chunks
*/
static inline int sys_ring_buf_space_get(struct ring_buf *buf)
{
if (sys_ring_buf_is_empty(buf)) {
return buf->size - 1;
}
if (buf->tail < buf->head) {
return buf->head - buf->tail - 1;
}
/* buf->tail > buf->head */
return (buf->size - buf->tail) + buf->head - 1;
}
/**
* @brief Place an entry into the ring buffer
*
* Concurrency control is not implemented, however no synchronization is needed
* between put() and get() operations as they independently work on the
* tail and head values, respectively.
* Any use-cases involving multiple producers will need to synchronize use
* of this function, by either disabling preemption or using a mutex.
*
* @param buf Ring buffer to insert data to
* @param type Application-specific type identifier
* @param value Integral data to include, application specific
* @param data Pointer to a buffer containing data to enqueue
* @param size32 Size of data buffer, in 32-bit chunks (not bytes)
* @return 0 on success, -ENOSPC if there isn't sufficient space
*/
int sys_ring_buf_put(struct ring_buf *buf, uint16_t type, uint8_t value,
uint32_t *data, uint8_t size32);
/**
* @brief Fetch data from the ring buffer
*
* @param buf Ring buffer to extract data from
* @param type Return storage of the retrieved event type
* @param value Return storage of the data value
* @param data Buffer to copy data into
* @param size32 Indicates the size of the data buffer. On return,
* updated with the actual amount of 32-bit chunks written to the buffer
* @return 0 on success, -EAGAIN if the ring buffer is empty, -EMSGSIZE
* if the supplied buffer is too small (size32 will be updated with
* the actual size needed)
*/
int sys_ring_buf_get(struct ring_buf *buf, uint16_t *type, uint8_t *value,
uint32_t *data, uint8_t *size32);
#endif /* __RING_BUFFER_H__ */

View file

@ -103,6 +103,15 @@ config ENHANCED_SECURITY
Users can customize these settings using the CUSTOM_SECURITY option
in the "Security Options" menu.
config RING_BUFFER
bool
prompt "Enable ring buffers"
default n
help
Enable usage of ring buffers. Similar to nanokernel FIFOs but manage
their own buffer memory and can store arbitrary data. For optimal
performance, use buffer sizes that are a power of 2.
config EVENT_LOGGER
bool
prompt "Enable event logger"

View file

@ -15,3 +15,4 @@ obj-$(CONFIG_ADVANCED_POWER_MANAGEMENT) += idle.o
obj-$(CONFIG_NANO_TIMERS) += nano_timer.o
obj-$(CONFIG_EVENT_LOGGER) += event_logger.o
obj-$(CONFIG_KERNEL_PROFILER) += profiler.o
obj-$(CONFIG_RING_BUFFER) += ring_buffer.o

View file

@ -0,0 +1,119 @@
/* ring_buffer.c: Simple ring buffer API */
/*
* Copyright (c) 2015 Intel Corporation
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1) Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
*
* 2) Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* 3) Neither the name of Wind River Systems nor the names of its contributors
* may be used to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#include <misc/ring_buffer.h>
/**
* Internal data structure for a buffer header.
*
* We want all of this to fit in a single uint32_t. Every item stored in the
* ring buffer will be one of these headers plus any extra data supplied
*/
struct ring_element {
uint32_t type :16; /**< Application-specific */
uint32_t length :8; /**< length in 32-bit chunks */
uint32_t value :8; /**< Room for small integral values */
};
int sys_ring_buf_put(struct ring_buf *buf, uint16_t type, uint8_t value,
uint32_t *data, uint8_t size32)
{
uint32_t i, space, index, rc;
space = sys_ring_buf_space_get(buf);
if (space >= (size32 + 1)) {
struct ring_element *header =
(struct ring_element *)&buf->buf[buf->tail];
header->type = type;
header->length = size32;
header->value = value;
if (likely(buf->mask)) {
for (i=0; i < size32; ++i) {
index = (i + buf->tail + 1) & buf->mask;
buf->buf[index] = data[i];
}
buf->tail = (buf->tail + size32 + 1) & buf->mask;
} else {
for (i=0; i < size32; ++i) {
index = (i + buf->tail + 1) % buf->size;
buf->buf[index] = data[i];
}
buf->tail = (buf->tail + size32 + 1) % buf->size;
}
rc = 0;
} else {
buf->dropped_put_count++;
rc = -EMSGSIZE;
}
return rc;
}
int sys_ring_buf_get(struct ring_buf *buf, uint16_t *type, uint8_t *value,
uint32_t *data, uint8_t *size32)
{
struct ring_element *header;
uint32_t i, index;
if (sys_ring_buf_is_empty(buf)) {
return -EAGAIN;
}
header = (struct ring_element *) &buf->buf[buf->head];
if (header->length > *size32) {
*size32 = header->length;
return -EMSGSIZE;
}
*size32 = header->length;
*type = header->type;
*value = header->value;
if (likely(buf->mask)) {
for (i = 0; i < header->length; ++i) {
index = (i + buf->head + 1) & buf->mask;
data[i] = buf->buf[index];
}
buf->head = (buf->head + header->length + 1) & buf->mask;
} else {
for (i = 0; i < header->length; ++i) {
index = (i + buf->head + 1) % buf->size;
data[i] = buf->buf[index];
}
buf->head = (buf->head + header->length + 1) % buf->size;
}
return 0;
}

View file

@ -0,0 +1,5 @@
KERNEL_TYPE = nano
PLATFORM_CONFIG ?= basic_atom
CONF_FILE = prj.conf
include $(ZEPHYR_BASE)/Makefile.inc

View file

@ -0,0 +1,2 @@
CONFIG_RING_BUFFER=y

View file

@ -0,0 +1,4 @@
ccflags-y += ${PROJECTINCLUDE} -I${srctree}/samples/include
obj-y = test_ring_buf.o

View file

@ -0,0 +1,121 @@
/* test_ring_buf.c: Simple ring buffer test application */
/*
* Copyright (c) 2015 Intel Corporation
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1) Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
*
* 2) Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* 3) Neither the name of Wind River Systems nor the names of its contributors
* may be used to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#include <tc_util.h>
#include <nanokernel.h>
#include <misc/ring_buffer.h>
SYS_RING_BUF_DECLARE_POW2(ring_buf, 8);
char data[] = "ABCDEFGHIJKLMNOPQRSTUVWX";
#define TYPE 1
#define VALUE 2
#define INITIAL_SIZE 2
void main(void)
{
int ret, put_count, i, rv;
uint32_t getdata[6];
uint8_t getsize, getval;
uint16_t gettype;
int dsize = INITIAL_SIZE;
TC_START("Test ring buffers");
rv = TC_FAIL;
put_count = 0;
while (1) {
ret = sys_ring_buf_put(&ring_buf, TYPE, VALUE,
(uint32_t *)data, dsize);
if (ret == -EMSGSIZE) {
printk("ring buffer is full\n");
break;
}
printk("inserted %d chunks, %d remaining\n", dsize,
sys_ring_buf_space_get(&ring_buf));
dsize = (dsize + 1) % SIZE32_OF(data);
put_count++;
}
getsize = INITIAL_SIZE - 1;
ret = sys_ring_buf_get(&ring_buf, &gettype, &getval, getdata, &getsize);
if (ret != -EMSGSIZE) {
printk("Allowed retreival with insufficient destination buffer space\n");
if (getsize != INITIAL_SIZE)
printk("Correct size wasn't reported back to the caller\n");
goto done;
}
for (i = 0; i < put_count; i++) {
getsize = SIZE32_OF(getdata);
ret = sys_ring_buf_get(&ring_buf, &gettype, &getval, getdata,
&getsize);
if (ret < 0) {
printk("Couldn't retrieve a stored value (%d)\n",
ret);
goto done;
}
printk("got %u chunks of type %u and val %u, %u remaining\n",
getsize, gettype, getval,
sys_ring_buf_space_get(&ring_buf));
if (memcmp((char*)getdata, data, getsize * sizeof(uint32_t))) {
printk("data corrupted\n");
goto done;
}
if (gettype != TYPE) {
printk("type information corrupted\n");
goto done;
}
if (getval != VALUE) {
printk("value information corrupted\n");
goto done;
}
}
getsize = SIZE32_OF(getdata);
ret = sys_ring_buf_get(&ring_buf, &gettype, &getval, getdata,
&getsize);
if (ret != -EAGAIN) {
printk("Got data out of an empty buffer");
goto done;
}
printk("empty buffer detected\n");
rv = TC_PASS;
done:
printk("head: %d tail: %d\n", ring_buf.head, ring_buf.tail);
TC_END_RESULT(rv);
TC_END_REPORT(rv);
}

View file

@ -0,0 +1,3 @@
[test]
tags = core