zbus: Add message bus subsystem to Zephyr
Add zbus message bus as a Zephyr subsystem. No message bus or communication abstraction other than the usual (message queues, mailboxes, etc.) enabled developers to implement event-driven systems in Zephyr quickly. Zbus would fill that gap by providing the community with a lightweight and flexible message bus. The implementation tries to be closest as possible to the existing ones. We use the claim/finish approach, and the API for publishing and reading channels are similar in message queues. Zbus is about channels, messages, and observers. Signed-off-by: Rodrigo Peixoto <rodrigopex@gmail.com>
This commit is contained in:
parent
47d09c04af
commit
b8ecbfaa57
14 changed files with 1411 additions and 0 deletions
14
subsys/zbus/CMakeLists.txt
Normal file
14
subsys/zbus/CMakeLists.txt
Normal file
|
@ -0,0 +1,14 @@
|
|||
# SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
zephyr_library()
|
||||
|
||||
zephyr_library_sources(zbus.c)
|
||||
|
||||
if(CONFIG_ZBUS_RUNTIME_OBSERVERS_POOL_SIZE GREATER 0)
|
||||
zephyr_library_sources(zbus_runtime_observers.c)
|
||||
endif()
|
||||
|
||||
if(CONFIG_ZBUS_STRUCTS_ITERABLE_ACCESS)
|
||||
zephyr_library_sources(zbus_iterable_sections.c)
|
||||
zephyr_linker_sources(DATA_SECTIONS zbus.ld)
|
||||
endif()
|
43
subsys/zbus/Kconfig
Normal file
43
subsys/zbus/Kconfig
Normal file
|
@ -0,0 +1,43 @@
|
|||
# Copyright (c) 2022 Rodrigo Peixoto <rodrigopex@gmail.com>
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
menuconfig ZBUS
|
||||
bool "Zbus support"
|
||||
help
|
||||
Enables support for Zephyr message bus.
|
||||
|
||||
if ZBUS
|
||||
|
||||
config ZBUS_STRUCTS_ITERABLE_ACCESS
|
||||
bool "Zbus iterable sections support."
|
||||
depends on !XTENSA && !ARCH_POSIX
|
||||
default y
|
||||
|
||||
config ZBUS_CHANNEL_NAME
|
||||
bool "Channel name field"
|
||||
|
||||
config ZBUS_OBSERVER_NAME
|
||||
bool "Observer name field"
|
||||
|
||||
config ZBUS_RUNTIME_OBSERVERS_POOL_SIZE
|
||||
int "The size of the runtime observers pool."
|
||||
default 0
|
||||
help
|
||||
When the size is bigger than zero this feature will be enabled. It applies the Object Pool Pattern,
|
||||
where the objects in the pool are pre-allocated and can be used and recycled after use. The
|
||||
technique avoids dynamic allocation and allows the code to increase the number of observers by
|
||||
only changing a configuration.
|
||||
|
||||
config ZBUS_ASSERT_MOCK
|
||||
bool "Zbus assert mock for test purposes."
|
||||
help
|
||||
This configuration enables the developer to change the _ZBUS_ASSERT behavior. When this configuration is
|
||||
enabled, _ZBUS_ASSERT returns -EFAULT instead of assert. It makes it more straightforward to test invalid
|
||||
parameters.
|
||||
|
||||
|
||||
module = ZBUS
|
||||
module-str = zbus
|
||||
source "subsys/logging/Kconfig.template.log_config"
|
||||
|
||||
endif # ZBUS
|
204
subsys/zbus/zbus.c
Normal file
204
subsys/zbus/zbus.c
Normal file
|
@ -0,0 +1,204 @@
|
|||
/*
|
||||
* Copyright (c) 2022 Rodrigo Peixoto <rodrigopex@gmail.com>
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
#include <zephyr/kernel.h>
|
||||
#include <zephyr/logging/log.h>
|
||||
#include <zephyr/sys/printk.h>
|
||||
#include <zephyr/zbus/zbus.h>
|
||||
LOG_MODULE_REGISTER(zbus, CONFIG_ZBUS_LOG_LEVEL);
|
||||
|
||||
k_timeout_t _zbus_timeout_remainder(uint64_t end_ticks)
|
||||
{
|
||||
int64_t now_ticks = sys_clock_tick_get();
|
||||
|
||||
return K_TICKS((k_ticks_t)MAX(end_ticks - now_ticks, 0));
|
||||
}
|
||||
|
||||
#if (CONFIG_ZBUS_RUNTIME_OBSERVERS_POOL_SIZE > 0)
|
||||
static inline void _zbus_notify_runtime_listeners(const struct zbus_channel *chan)
|
||||
{
|
||||
__ASSERT(chan != NULL, "chan is required");
|
||||
|
||||
struct zbus_observer_node *obs_nd, *tmp;
|
||||
|
||||
SYS_SLIST_FOR_EACH_CONTAINER_SAFE(chan->runtime_observers, obs_nd, tmp, node) {
|
||||
|
||||
__ASSERT(obs_nd != NULL, "observer node is NULL");
|
||||
|
||||
if (obs_nd->obs->enabled && (obs_nd->obs->callback != NULL)) {
|
||||
obs_nd->obs->callback(chan);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static inline int _zbus_notify_runtime_subscribers(const struct zbus_channel *chan,
|
||||
uint64_t end_ticks)
|
||||
{
|
||||
__ASSERT(chan != NULL, "chan is required");
|
||||
|
||||
int last_error = 0, err;
|
||||
struct zbus_observer_node *obs_nd, *tmp;
|
||||
|
||||
SYS_SLIST_FOR_EACH_CONTAINER_SAFE(chan->runtime_observers, obs_nd, tmp, node) {
|
||||
|
||||
__ASSERT(obs_nd != NULL, "observer node is NULL");
|
||||
|
||||
if (obs_nd->obs->enabled && (obs_nd->obs->queue != NULL)) {
|
||||
err = k_msgq_put(obs_nd->obs->queue, &chan,
|
||||
_zbus_timeout_remainder(end_ticks));
|
||||
|
||||
_ZBUS_ASSERT(err == 0,
|
||||
"could not deliver notification to observer %s. Error code %d",
|
||||
_ZBUS_OBS_NAME(obs_nd->obs), err);
|
||||
|
||||
if (err) {
|
||||
last_error = err;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return last_error;
|
||||
}
|
||||
#endif /* CONFIG_ZBUS_RUNTIME_OBSERVERS_POOL_SIZE */
|
||||
|
||||
static int _zbus_notify_observers(const struct zbus_channel *chan, uint64_t end_ticks)
|
||||
{
|
||||
int last_error = 0, err;
|
||||
/* Notify static listeners */
|
||||
for (const struct zbus_observer *const *obs = chan->observers; *obs != NULL; ++obs) {
|
||||
if ((*obs)->enabled && ((*obs)->callback != NULL)) {
|
||||
(*obs)->callback(chan);
|
||||
}
|
||||
}
|
||||
|
||||
#if CONFIG_ZBUS_RUNTIME_OBSERVERS_POOL_SIZE > 0
|
||||
_zbus_notify_runtime_listeners(chan);
|
||||
#endif /* CONFIG_ZBUS_RUNTIME_OBSERVERS_POOL_SIZE */
|
||||
|
||||
/* Notify static subscribers */
|
||||
for (const struct zbus_observer *const *obs = chan->observers; *obs != NULL; ++obs) {
|
||||
if ((*obs)->enabled && ((*obs)->queue != NULL)) {
|
||||
err = k_msgq_put((*obs)->queue, &chan, _zbus_timeout_remainder(end_ticks));
|
||||
_ZBUS_ASSERT(err == 0, "could not deliver notification to observer %s.",
|
||||
_ZBUS_OBS_NAME(*obs));
|
||||
if (err) {
|
||||
LOG_ERR("Observer %s at %p could not be notified. Error code %d",
|
||||
_ZBUS_OBS_NAME(*obs), *obs, err);
|
||||
last_error = err;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#if CONFIG_ZBUS_RUNTIME_OBSERVERS_POOL_SIZE > 0
|
||||
err = _zbus_notify_runtime_subscribers(chan, end_ticks);
|
||||
if (err) {
|
||||
last_error = err;
|
||||
}
|
||||
#endif /* CONFIG_ZBUS_RUNTIME_OBSERVERS_POOL_SIZE */
|
||||
return last_error;
|
||||
}
|
||||
|
||||
int zbus_chan_pub(const struct zbus_channel *chan, const void *msg, k_timeout_t timeout)
|
||||
{
|
||||
int err;
|
||||
uint64_t end_ticks = sys_clock_timeout_end_calc(timeout);
|
||||
|
||||
_ZBUS_ASSERT(!k_is_in_isr(), "zbus cannot be used inside ISRs");
|
||||
_ZBUS_ASSERT(chan != NULL, "chan is required");
|
||||
_ZBUS_ASSERT(msg != NULL, "msg is required");
|
||||
|
||||
if (chan->validator != NULL && !chan->validator(msg, chan->message_size)) {
|
||||
return -ENOMSG;
|
||||
}
|
||||
|
||||
err = k_mutex_lock(chan->mutex, timeout);
|
||||
if (err) {
|
||||
return err;
|
||||
}
|
||||
|
||||
memcpy(chan->message, msg, chan->message_size);
|
||||
|
||||
err = _zbus_notify_observers(chan, end_ticks);
|
||||
|
||||
k_mutex_unlock(chan->mutex);
|
||||
|
||||
return err;
|
||||
}
|
||||
|
||||
int zbus_chan_read(const struct zbus_channel *chan, void *msg, k_timeout_t timeout)
|
||||
{
|
||||
int err;
|
||||
|
||||
_ZBUS_ASSERT(!k_is_in_isr(), "zbus cannot be used inside ISRs");
|
||||
_ZBUS_ASSERT(chan != NULL, "chan is required");
|
||||
_ZBUS_ASSERT(msg != NULL, "msg is required");
|
||||
|
||||
err = k_mutex_lock(chan->mutex, timeout);
|
||||
if (err) {
|
||||
return err;
|
||||
}
|
||||
|
||||
memcpy(msg, chan->message, chan->message_size);
|
||||
|
||||
return k_mutex_unlock(chan->mutex);
|
||||
}
|
||||
|
||||
int zbus_chan_notify(const struct zbus_channel *chan, k_timeout_t timeout)
|
||||
{
|
||||
int err;
|
||||
uint64_t end_ticks = sys_clock_timeout_end_calc(timeout);
|
||||
|
||||
_ZBUS_ASSERT(!k_is_in_isr(), "zbus cannot be used inside ISRs");
|
||||
_ZBUS_ASSERT(chan != NULL, "chan is required");
|
||||
|
||||
err = k_mutex_lock(chan->mutex, timeout);
|
||||
if (err) {
|
||||
return err;
|
||||
}
|
||||
|
||||
err = _zbus_notify_observers(chan, end_ticks);
|
||||
|
||||
k_mutex_unlock(chan->mutex);
|
||||
|
||||
return err;
|
||||
}
|
||||
|
||||
int zbus_chan_claim(const struct zbus_channel *chan, k_timeout_t timeout)
|
||||
{
|
||||
_ZBUS_ASSERT(!k_is_in_isr(), "zbus cannot be used inside ISRs");
|
||||
_ZBUS_ASSERT(chan != NULL, "chan is required");
|
||||
|
||||
int err = k_mutex_lock(chan->mutex, timeout);
|
||||
|
||||
if (err) {
|
||||
return err;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int zbus_chan_finish(const struct zbus_channel *chan)
|
||||
{
|
||||
_ZBUS_ASSERT(!k_is_in_isr(), "zbus cannot be used inside ISRs");
|
||||
_ZBUS_ASSERT(chan != NULL, "chan is required");
|
||||
|
||||
int err = k_mutex_unlock(chan->mutex);
|
||||
|
||||
return err;
|
||||
}
|
||||
|
||||
int zbus_sub_wait(const struct zbus_observer *sub, const struct zbus_channel **chan,
|
||||
k_timeout_t timeout)
|
||||
{
|
||||
_ZBUS_ASSERT(!k_is_in_isr(), "zbus cannot be used inside ISRs");
|
||||
_ZBUS_ASSERT(sub != NULL, "sub is required");
|
||||
_ZBUS_ASSERT(chan != NULL, "chan is required");
|
||||
|
||||
if (sub->queue == NULL) {
|
||||
return -EINVAL;
|
||||
}
|
||||
|
||||
return k_msgq_get(sub->queue, chan, timeout);
|
||||
}
|
2
subsys/zbus/zbus.ld
Normal file
2
subsys/zbus/zbus.ld
Normal file
|
@ -0,0 +1,2 @@
|
|||
ITERABLE_SECTION_RAM(zbus_channel, 4)
|
||||
ITERABLE_SECTION_RAM(zbus_observer, 4)
|
27
subsys/zbus/zbus_iterable_sections.c
Normal file
27
subsys/zbus/zbus_iterable_sections.c
Normal file
|
@ -0,0 +1,27 @@
|
|||
/*
|
||||
* Copyright (c) 2022 Rodrigo Peixoto <rodrigopex@gmail.com>
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
#include <zephyr/logging/log.h>
|
||||
#include <zephyr/zbus/zbus.h>
|
||||
LOG_MODULE_DECLARE(zbus, CONFIG_ZBUS_LOG_LEVEL);
|
||||
|
||||
bool zbus_iterate_over_channels(bool (*iterator_func)(const struct zbus_channel *chan))
|
||||
{
|
||||
STRUCT_SECTION_FOREACH(zbus_channel, chan) {
|
||||
if (!(*iterator_func)(chan)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
bool zbus_iterate_over_observers(bool (*iterator_func)(const struct zbus_observer *obs))
|
||||
{
|
||||
STRUCT_SECTION_FOREACH(zbus_observer, obs) {
|
||||
if (!(*iterator_func)(obs)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
106
subsys/zbus/zbus_runtime_observers.c
Normal file
106
subsys/zbus/zbus_runtime_observers.c
Normal file
|
@ -0,0 +1,106 @@
|
|||
/*
|
||||
* Copyright (c) 2022 Rodrigo Peixoto <rodrigopex@gmail.com>
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
#include <zephyr/kernel.h>
|
||||
#include <zephyr/logging/log.h>
|
||||
#include <zephyr/zbus/zbus.h>
|
||||
|
||||
LOG_MODULE_DECLARE(zbus, CONFIG_ZBUS_LOG_LEVEL);
|
||||
|
||||
K_MEM_SLAB_DEFINE_STATIC(_zbus_runtime_obs_pool, sizeof(struct zbus_observer_node),
|
||||
CONFIG_ZBUS_RUNTIME_OBSERVERS_POOL_SIZE, 4);
|
||||
|
||||
struct k_mem_slab *zbus_runtime_obs_pool(void)
|
||||
{
|
||||
return &_zbus_runtime_obs_pool;
|
||||
}
|
||||
|
||||
int zbus_chan_add_obs(const struct zbus_channel *chan, const struct zbus_observer *obs,
|
||||
k_timeout_t timeout)
|
||||
{
|
||||
int err;
|
||||
struct zbus_observer_node *obs_nd, *tmp;
|
||||
uint64_t end_ticks = sys_clock_timeout_end_calc(timeout);
|
||||
|
||||
_ZBUS_ASSERT(!k_is_in_isr(), "ISR blocked");
|
||||
_ZBUS_ASSERT(chan != NULL, "chan is required");
|
||||
_ZBUS_ASSERT(obs != NULL, "obs is required");
|
||||
|
||||
/* Check if the observer is already a static observer */
|
||||
for (const struct zbus_observer *const *static_obs = chan->observers; *static_obs != NULL;
|
||||
++static_obs) {
|
||||
if (*static_obs == obs) {
|
||||
return -EEXIST;
|
||||
}
|
||||
}
|
||||
|
||||
err = k_mutex_lock(chan->mutex, timeout);
|
||||
if (err) {
|
||||
return err;
|
||||
}
|
||||
|
||||
/* Check if the observer is already a runtime observer */
|
||||
SYS_SLIST_FOR_EACH_CONTAINER_SAFE(chan->runtime_observers, obs_nd, tmp, node) {
|
||||
if (obs_nd->obs == obs) {
|
||||
k_mutex_unlock(chan->mutex);
|
||||
|
||||
return -EALREADY;
|
||||
}
|
||||
}
|
||||
|
||||
err = k_mem_slab_alloc(&_zbus_runtime_obs_pool, (void **)&obs_nd,
|
||||
_zbus_timeout_remainder(end_ticks));
|
||||
|
||||
if (err) {
|
||||
LOG_ERR("Could not allocate memory on runtime observers pool\n");
|
||||
|
||||
k_mutex_unlock(chan->mutex);
|
||||
|
||||
return err;
|
||||
}
|
||||
|
||||
obs_nd->obs = obs;
|
||||
|
||||
sys_slist_append(chan->runtime_observers, &obs_nd->node);
|
||||
|
||||
k_mutex_unlock(chan->mutex);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int zbus_chan_rm_obs(const struct zbus_channel *chan, const struct zbus_observer *obs,
|
||||
k_timeout_t timeout)
|
||||
{
|
||||
int err;
|
||||
struct zbus_observer_node *obs_nd, *tmp;
|
||||
struct zbus_observer_node *prev_obs_nd = NULL;
|
||||
|
||||
_ZBUS_ASSERT(!k_is_in_isr(), "ISR blocked");
|
||||
_ZBUS_ASSERT(chan != NULL, "chan is required");
|
||||
_ZBUS_ASSERT(obs != NULL, "obs is required");
|
||||
|
||||
err = k_mutex_lock(chan->mutex, timeout);
|
||||
if (err) {
|
||||
return err;
|
||||
}
|
||||
|
||||
SYS_SLIST_FOR_EACH_CONTAINER_SAFE(chan->runtime_observers, obs_nd, tmp, node) {
|
||||
if (obs_nd->obs == obs) {
|
||||
sys_slist_remove(chan->runtime_observers, &prev_obs_nd->node,
|
||||
&obs_nd->node);
|
||||
|
||||
k_mem_slab_free(&_zbus_runtime_obs_pool, (void **)&obs_nd);
|
||||
|
||||
k_mutex_unlock(chan->mutex);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
prev_obs_nd = obs_nd;
|
||||
}
|
||||
|
||||
k_mutex_unlock(chan->mutex);
|
||||
|
||||
return -ENODATA;
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue