zbus: Improve consistency with runtime observers

A previous PR merged (to remove runtime observers' dependency with heap)
added inconsistencies and compatibility breaks to the zbus. This commit
improves that by removing the inconsistencies and still attending to the
features requested by the community.

Signed-off-by: Rodrigo Peixoto <rodrigopex@gmail.com>
This commit is contained in:
Rodrigo Peixoto 2025-04-19 09:56:50 -03:00 committed by Benjamin Cabé
commit 6ae9a981d0
3 changed files with 205 additions and 12 deletions

View file

@ -177,6 +177,9 @@ struct zbus_channel_observation_mask {
bool enabled;
};
/**
* @brief Structure for linking observers to chanels
*/
struct zbus_channel_observation {
const struct zbus_channel *chan;
const struct zbus_observer *obs;
@ -862,17 +865,23 @@ static inline void zbus_chan_pub_stats_update(const struct zbus_channel *chan)
#if defined(CONFIG_ZBUS_RUNTIME_OBSERVERS) || defined(__DOXYGEN__)
/**
* @brief Structure for linking observers to chanels
* @brief Structure used to register runtime obeservers
*
*/
struct zbus_observer_node {
sys_snode_t node;
const struct zbus_observer *obs;
#if defined(CONFIG_ZBUS_RUNTIME_OBSERVERS_NODE_ALLOC_NONE)
const struct zbus_channel *chan;
#endif
};
#if defined(CONFIG_ZBUS_RUNTIME_OBSERVERS_NODE_ALLOC_NONE) || defined(__DOXYGEN__)
/**
* @brief Add an observer to a channel.
*
* This routine adds an observer to the channel.
* This routine adds an observer to the channel by providing an allocated node. This function is
* only supported if the CONFIG_ZBUS_RUNTIME_OBSERVERS_NODE_ALLOC_NONE is enabled.
*
* @param chan The channel's reference.
* @param obs The observer's reference to be added.
@ -881,13 +890,63 @@ struct zbus_observer_node {
* or one of the special values K_NO_WAIT and K_FOREVER.
*
* @retval 0 Observer added to the channel.
* @retval -EEXIST The observer is already present in the channel's observers list.
* @retval -EALREADY The observer is already present in the channel's runtime observers list.
* @retval -EAGAIN Waiting period timed out.
* @retval -EINVAL Some parameter is invalid.
* @retval -EBUSY The node is already in use.
*/
int zbus_chan_add_obs_with_node(const struct zbus_channel *chan, const struct zbus_observer *obs,
struct zbus_observer_node *node, k_timeout_t timeout);
#else
static inline int zbus_chan_add_obs_with_node(const struct zbus_channel *chan,
const struct zbus_observer *obs,
struct zbus_observer_node *node, k_timeout_t timeout)
{
ARG_UNUSED(chan);
ARG_UNUSED(obs);
ARG_UNUSED(node);
ARG_UNUSED(timeout);
return -ENOTSUP;
}
#endif /* CONFIG_ZBUS_RUNTIME_OBSERVERS_NODE_ALLOC_NONE */
#if !defined(CONFIG_ZBUS_RUNTIME_OBSERVERS_NODE_ALLOC_NONE) || defined(__DOXYGEN__)
/**
* @brief Add an observer to a channel.
*
* This routine adds an observer to the channel in runtime. This function is only supported if the
* CONFIG_ZBUS_RUNTIME_OBSERVERS_NODE_ALLOC_DYNAMIC or
* CONFIG_ZBUS_RUNTIME_OBSERVERS_NODE_ALLOC_STATIC is enabled.
*
*
* @param chan The channel's reference.
* @param obs The observer's reference to be added.
* @param timeout Waiting period to add an observer,
* or one of the special values K_NO_WAIT and K_FOREVER.
*
* @retval 0 Observer added to the channel.
* @retval -EBUSY Returned without waiting.
* @retval -EAGAIN Waiting period timed out.
* @retval -EEXIST The observer is already present in the channel's observers list.
* @retval -EALREADY The observer is already present in the channel's runtime observers list.
* @retval -ENOMEM No memory available for a new runtime observer node.
*/
int zbus_chan_add_obs(const struct zbus_channel *chan, const struct zbus_observer *obs,
struct zbus_observer_node *node, k_timeout_t timeout);
k_timeout_t timeout);
#else
static inline int zbus_chan_add_obs(const struct zbus_channel *chan,
const struct zbus_observer *obs, k_timeout_t timeout)
{
ARG_UNUSED(chan);
ARG_UNUSED(obs);
ARG_UNUSED(timeout);
return -ENOTSUP;
}
#endif /* !CONFIG_ZBUS_RUNTIME_OBSERVERS_NODE_ALLOC_NONE */
/**
* @brief Remove an observer from a channel.
*
@ -899,11 +958,9 @@ int zbus_chan_add_obs(const struct zbus_channel *chan, const struct zbus_observe
* or one of the special values K_NO_WAIT and K_FOREVER.
*
* @retval 0 Observer removed to the channel.
* @retval -EINVAL Invalid data supplied.
* @retval -EBUSY Returned without waiting.
* @retval -EAGAIN Waiting period timed out.
* @retval -ENODATA no observer found in channel's runtime observer list.
* @retval -ENOMEM Returned without waiting.
*/
int zbus_chan_rm_obs(const struct zbus_channel *chan, const struct zbus_observer *obs,
k_timeout_t timeout);

View file

@ -9,6 +9,10 @@ menuconfig ZBUS
if ZBUS
config ZBUS_PREFER_DYNAMIC_ALLOCATION
bool "Set zbus to work with dynamic allocation using the system heap"
default y
config ZBUS_CHANNELS_SYS_INIT_PRIORITY
default 5
int "The priority used during the SYS_INIT procedure."
@ -33,7 +37,8 @@ if ZBUS_MSG_SUBSCRIBER
choice ZBUS_MSG_SUBSCRIBER_BUF_ALLOC
prompt "ZBus msg_subscribers buffer allocation"
default ZBUS_MSG_SUBSCRIBER_BUF_ALLOC_DYNAMIC
default ZBUS_MSG_SUBSCRIBER_BUF_ALLOC_DYNAMIC if ZBUS_PREFER_DYNAMIC_ALLOCATION
default ZBUS_MSG_SUBSCRIBER_BUF_ALLOC_STATIC
config ZBUS_MSG_SUBSCRIBER_BUF_ALLOC_DYNAMIC
bool "Use heap to allocate msg_subscriber buffers data"
@ -63,6 +68,31 @@ endif # ZBUS_MSG_SUBSCRIBER
config ZBUS_RUNTIME_OBSERVERS
bool "Runtime observers support."
if ZBUS_RUNTIME_OBSERVERS
choice ZBUS_RUNTIME_OBSERVERS_NODE_ALLOC
prompt "ZBus runtime observers node allocation"
default ZBUS_RUNTIME_OBSERVERS_NODE_ALLOC_DYNAMIC if ZBUS_PREFER_DYNAMIC_ALLOCATION
default ZBUS_RUNTIME_OBSERVERS_NODE_ALLOC_STATIC
config ZBUS_RUNTIME_OBSERVERS_NODE_ALLOC_DYNAMIC
bool "Use heap to allocate runtime observers node"
config ZBUS_RUNTIME_OBSERVERS_NODE_ALLOC_STATIC
bool "Use a pool of runtime observers nodes"
config ZBUS_RUNTIME_OBSERVERS_NODE_ALLOC_NONE
bool "Use user-provided runtime observers nodes"
endchoice
config ZBUS_RUNTIME_OBSERVERS_NODE_POOL_SIZE
int "Runtime observer pool size"
depends on ZBUS_RUNTIME_OBSERVERS_NODE_ALLOC_STATIC
default 8
endif # ZBUS_RUNTIME_OBSERVERS
config ZBUS_PRIORITY_BOOST
bool "ZBus priority boost algorithm"
default y
@ -80,9 +110,9 @@ config ZBUS_ASSERT_MOCK
config HEAP_MEM_POOL_ADD_SIZE_ZBUS
int
default 2048 if ZBUS_MSG_SUBSCRIBER_BUF_ALLOC_DYNAMIC && !ZBUS_RUNTIME_OBSERVERS
default 1024 if !ZBUS_MSG_SUBSCRIBER_BUF_ALLOC_DYNAMIC && ZBUS_RUNTIME_OBSERVERS
default 3072 if ZBUS_MSG_SUBSCRIBER_BUF_ALLOC_DYNAMIC && ZBUS_RUNTIME_OBSERVERS
default 2048 if ZBUS_MSG_SUBSCRIBER_BUF_ALLOC_DYNAMIC && !ZBUS_RUNTIME_OBSERVERS_NODE_ALLOC_DYNAMIC
default 1024 if !ZBUS_MSG_SUBSCRIBER_BUF_ALLOC_DYNAMIC && ZBUS_RUNTIME_OBSERVERS_NODE_ALLOC_DYNAMIC
default 3072 if ZBUS_MSG_SUBSCRIBER_BUF_ALLOC_DYNAMIC && ZBUS_RUNTIME_OBSERVERS_NODE_ALLOC_DYNAMIC
module = ZBUS

View file

@ -8,17 +8,63 @@
LOG_MODULE_DECLARE(zbus, CONFIG_ZBUS_LOG_LEVEL);
int zbus_chan_add_obs(const struct zbus_channel *chan, const struct zbus_observer *obs,
struct zbus_observer_node *node, k_timeout_t timeout)
#if defined(CONFIG_ZBUS_RUNTIME_OBSERVERS_NODE_ALLOC_DYNAMIC)
static inline int _zbus_runtime_observer_node_alloc(struct zbus_observer_node **node,
k_timeout_t timeout)
{
ARG_UNUSED(timeout);
*node = k_malloc(sizeof(struct zbus_observer_node));
_ZBUS_ASSERT(*node != NULL, "could not allocate observer node the heap is full!");
if (*node == NULL) {
return -ENOMEM;
}
return 0;
}
static inline void _zbus_runtime_observer_node_free(struct zbus_observer_node *node)
{
k_free(node);
}
#elif defined(CONFIG_ZBUS_RUNTIME_OBSERVERS_NODE_ALLOC_STATIC)
K_MEM_SLAB_DEFINE_STATIC(_zbus_runtime_observers_slab, sizeof(struct zbus_observer_node),
CONFIG_ZBUS_RUNTIME_OBSERVERS_NODE_POOL_SIZE, 8);
static inline int _zbus_runtime_observer_node_alloc(struct zbus_observer_node **node,
k_timeout_t timeout)
{
int err = k_mem_slab_alloc(&_zbus_runtime_observers_slab, (void **)node, timeout);
_ZBUS_ASSERT(*node != NULL, "not enough runtime observer nodes in the pool. Increase the "
"ZBUS_RUNTIME_OBSERVERS_NODE_POOL_SIZE");
return err;
}
static inline void _zbus_runtime_observer_node_free(struct zbus_observer_node *node)
{
k_mem_slab_free(&_zbus_runtime_observers_slab, (void *)node);
}
#endif /* CONFIG_ZBUS_RUNTIME_OBSERVERS_NODE_ALLOC_DYNAMIC */
static inline int _zbus_runtime_take_chan_sem_and_obs_check(const struct zbus_channel *chan,
const struct zbus_observer *obs,
k_timeout_t timeout)
{
int err;
struct zbus_observer_node *obs_nd, *tmp;
struct zbus_channel_observation *observation;
_ZBUS_ASSERT(!k_is_in_isr(), "ISR blocked");
_ZBUS_ASSERT(chan != NULL, "chan is required");
_ZBUS_ASSERT(obs != NULL, "obs is required");
_ZBUS_ASSERT(node != NULL, "node is required");
err = k_sem_take(&chan->data->sem, timeout);
if (err) {
@ -47,7 +93,29 @@ int zbus_chan_add_obs(const struct zbus_channel *chan, const struct zbus_observe
}
}
return 0;
}
#if defined(CONFIG_ZBUS_RUNTIME_OBSERVERS_NODE_ALLOC_NONE)
int zbus_chan_add_obs_with_node(const struct zbus_channel *chan, const struct zbus_observer *obs,
struct zbus_observer_node *node, k_timeout_t timeout)
{
int err;
/* On success the channel semaphore has been taken */
err = _zbus_runtime_take_chan_sem_and_obs_check(chan, obs, timeout);
if (err) {
return err;
}
if (node->chan != NULL) {
k_sem_give(&chan->data->sem);
return -EBUSY;
}
node->obs = obs;
node->chan = chan;
sys_slist_append(&chan->data->observers, &node->node);
@ -56,6 +124,39 @@ int zbus_chan_add_obs(const struct zbus_channel *chan, const struct zbus_observe
return 0;
}
#else
int zbus_chan_add_obs(const struct zbus_channel *chan, const struct zbus_observer *obs,
k_timeout_t timeout)
{
int err;
k_timepoint_t end_time = sys_timepoint_calc(timeout);
/* On success the channel semaphore has been taken */
err = _zbus_runtime_take_chan_sem_and_obs_check(chan, obs, timeout);
if (err) {
return err;
}
struct zbus_observer_node *new_obs_nd = NULL;
err = _zbus_runtime_observer_node_alloc(&new_obs_nd, sys_timepoint_timeout(end_time));
if (err) {
k_sem_give(&chan->data->sem);
return err;
}
new_obs_nd->obs = obs;
sys_slist_append(&chan->data->observers, &new_obs_nd->node);
k_sem_give(&chan->data->sem);
return 0;
}
#endif /* CONFIG_ZBUS_RUNTIME_OBSERVERS_NODE_ALLOC_NONE */
int zbus_chan_rm_obs(const struct zbus_channel *chan, const struct zbus_observer *obs,
k_timeout_t timeout)
{
@ -75,6 +176,11 @@ int zbus_chan_rm_obs(const struct zbus_channel *chan, const struct zbus_observer
SYS_SLIST_FOR_EACH_CONTAINER_SAFE(&chan->data->observers, obs_nd, tmp, node) {
if (obs_nd->obs == obs) {
sys_slist_remove(&chan->data->observers, &prev_obs_nd->node, &obs_nd->node);
#if defined(CONFIG_ZBUS_RUNTIME_OBSERVERS_NODE_ALLOC_NONE)
obs_nd->chan = NULL;
#else
_zbus_runtime_observer_node_free(obs_nd);
#endif
k_sem_give(&chan->data->sem);