From 6ae9a981d0bc91de9fb82855d5da18ebbddf4ea5 Mon Sep 17 00:00:00 2001 From: Rodrigo Peixoto Date: Sat, 19 Apr 2025 09:56:50 -0300 Subject: [PATCH] zbus: Improve consistency with runtime observers A previous PR merged (to remove runtime observers' dependency with heap) added inconsistencies and compatibility breaks to the zbus. This commit improves that by removing the inconsistencies and still attending to the features requested by the community. Signed-off-by: Rodrigo Peixoto --- include/zephyr/zbus/zbus.h | 67 ++++++++++++++-- subsys/zbus/Kconfig | 38 ++++++++- subsys/zbus/zbus_runtime_observers.c | 112 ++++++++++++++++++++++++++- 3 files changed, 205 insertions(+), 12 deletions(-) diff --git a/include/zephyr/zbus/zbus.h b/include/zephyr/zbus/zbus.h index c13e77082e7..77fcc84befb 100644 --- a/include/zephyr/zbus/zbus.h +++ b/include/zephyr/zbus/zbus.h @@ -177,6 +177,9 @@ struct zbus_channel_observation_mask { bool enabled; }; +/** + * @brief Structure for linking observers to chanels + */ struct zbus_channel_observation { const struct zbus_channel *chan; const struct zbus_observer *obs; @@ -862,17 +865,23 @@ static inline void zbus_chan_pub_stats_update(const struct zbus_channel *chan) #if defined(CONFIG_ZBUS_RUNTIME_OBSERVERS) || defined(__DOXYGEN__) /** - * @brief Structure for linking observers to chanels + * @brief Structure used to register runtime obeservers + * */ struct zbus_observer_node { sys_snode_t node; const struct zbus_observer *obs; +#if defined(CONFIG_ZBUS_RUNTIME_OBSERVERS_NODE_ALLOC_NONE) + const struct zbus_channel *chan; +#endif }; +#if defined(CONFIG_ZBUS_RUNTIME_OBSERVERS_NODE_ALLOC_NONE) || defined(__DOXYGEN__) /** * @brief Add an observer to a channel. * - * This routine adds an observer to the channel. + * This routine adds an observer to the channel by providing an allocated node. This function is + * only supported if the CONFIG_ZBUS_RUNTIME_OBSERVERS_NODE_ALLOC_NONE is enabled. * * @param chan The channel's reference. * @param obs The observer's reference to be added. @@ -881,13 +890,63 @@ struct zbus_observer_node { * or one of the special values K_NO_WAIT and K_FOREVER. * * @retval 0 Observer added to the channel. + * @retval -EEXIST The observer is already present in the channel's observers list. * @retval -EALREADY The observer is already present in the channel's runtime observers list. * @retval -EAGAIN Waiting period timed out. * @retval -EINVAL Some parameter is invalid. + * @retval -EBUSY The node is already in use. + */ +int zbus_chan_add_obs_with_node(const struct zbus_channel *chan, const struct zbus_observer *obs, + struct zbus_observer_node *node, k_timeout_t timeout); +#else +static inline int zbus_chan_add_obs_with_node(const struct zbus_channel *chan, + const struct zbus_observer *obs, + struct zbus_observer_node *node, k_timeout_t timeout) +{ + ARG_UNUSED(chan); + ARG_UNUSED(obs); + ARG_UNUSED(node); + ARG_UNUSED(timeout); + + return -ENOTSUP; +} +#endif /* CONFIG_ZBUS_RUNTIME_OBSERVERS_NODE_ALLOC_NONE */ + +#if !defined(CONFIG_ZBUS_RUNTIME_OBSERVERS_NODE_ALLOC_NONE) || defined(__DOXYGEN__) +/** + * @brief Add an observer to a channel. + * + * This routine adds an observer to the channel in runtime. This function is only supported if the + * CONFIG_ZBUS_RUNTIME_OBSERVERS_NODE_ALLOC_DYNAMIC or + * CONFIG_ZBUS_RUNTIME_OBSERVERS_NODE_ALLOC_STATIC is enabled. + * + * + * @param chan The channel's reference. + * @param obs The observer's reference to be added. + * @param timeout Waiting period to add an observer, + * or one of the special values K_NO_WAIT and K_FOREVER. + * + * @retval 0 Observer added to the channel. + * @retval -EBUSY Returned without waiting. + * @retval -EAGAIN Waiting period timed out. + * @retval -EEXIST The observer is already present in the channel's observers list. + * @retval -EALREADY The observer is already present in the channel's runtime observers list. + * @retval -ENOMEM No memory available for a new runtime observer node. */ int zbus_chan_add_obs(const struct zbus_channel *chan, const struct zbus_observer *obs, - struct zbus_observer_node *node, k_timeout_t timeout); + k_timeout_t timeout); +#else +static inline int zbus_chan_add_obs(const struct zbus_channel *chan, + const struct zbus_observer *obs, k_timeout_t timeout) +{ + ARG_UNUSED(chan); + ARG_UNUSED(obs); + ARG_UNUSED(timeout); + return -ENOTSUP; +} + +#endif /* !CONFIG_ZBUS_RUNTIME_OBSERVERS_NODE_ALLOC_NONE */ /** * @brief Remove an observer from a channel. * @@ -899,11 +958,9 @@ int zbus_chan_add_obs(const struct zbus_channel *chan, const struct zbus_observe * or one of the special values K_NO_WAIT and K_FOREVER. * * @retval 0 Observer removed to the channel. - * @retval -EINVAL Invalid data supplied. * @retval -EBUSY Returned without waiting. * @retval -EAGAIN Waiting period timed out. * @retval -ENODATA no observer found in channel's runtime observer list. - * @retval -ENOMEM Returned without waiting. */ int zbus_chan_rm_obs(const struct zbus_channel *chan, const struct zbus_observer *obs, k_timeout_t timeout); diff --git a/subsys/zbus/Kconfig b/subsys/zbus/Kconfig index 9cd518c6215..632fbd5f81f 100644 --- a/subsys/zbus/Kconfig +++ b/subsys/zbus/Kconfig @@ -9,6 +9,10 @@ menuconfig ZBUS if ZBUS +config ZBUS_PREFER_DYNAMIC_ALLOCATION + bool "Set zbus to work with dynamic allocation using the system heap" + default y + config ZBUS_CHANNELS_SYS_INIT_PRIORITY default 5 int "The priority used during the SYS_INIT procedure." @@ -33,7 +37,8 @@ if ZBUS_MSG_SUBSCRIBER choice ZBUS_MSG_SUBSCRIBER_BUF_ALLOC prompt "ZBus msg_subscribers buffer allocation" - default ZBUS_MSG_SUBSCRIBER_BUF_ALLOC_DYNAMIC + default ZBUS_MSG_SUBSCRIBER_BUF_ALLOC_DYNAMIC if ZBUS_PREFER_DYNAMIC_ALLOCATION + default ZBUS_MSG_SUBSCRIBER_BUF_ALLOC_STATIC config ZBUS_MSG_SUBSCRIBER_BUF_ALLOC_DYNAMIC bool "Use heap to allocate msg_subscriber buffers data" @@ -63,6 +68,31 @@ endif # ZBUS_MSG_SUBSCRIBER config ZBUS_RUNTIME_OBSERVERS bool "Runtime observers support." +if ZBUS_RUNTIME_OBSERVERS + +choice ZBUS_RUNTIME_OBSERVERS_NODE_ALLOC + prompt "ZBus runtime observers node allocation" + default ZBUS_RUNTIME_OBSERVERS_NODE_ALLOC_DYNAMIC if ZBUS_PREFER_DYNAMIC_ALLOCATION + default ZBUS_RUNTIME_OBSERVERS_NODE_ALLOC_STATIC + +config ZBUS_RUNTIME_OBSERVERS_NODE_ALLOC_DYNAMIC + bool "Use heap to allocate runtime observers node" + +config ZBUS_RUNTIME_OBSERVERS_NODE_ALLOC_STATIC + bool "Use a pool of runtime observers nodes" + +config ZBUS_RUNTIME_OBSERVERS_NODE_ALLOC_NONE + bool "Use user-provided runtime observers nodes" + +endchoice + +config ZBUS_RUNTIME_OBSERVERS_NODE_POOL_SIZE + int "Runtime observer pool size" + depends on ZBUS_RUNTIME_OBSERVERS_NODE_ALLOC_STATIC + default 8 + +endif # ZBUS_RUNTIME_OBSERVERS + config ZBUS_PRIORITY_BOOST bool "ZBus priority boost algorithm" default y @@ -80,9 +110,9 @@ config ZBUS_ASSERT_MOCK config HEAP_MEM_POOL_ADD_SIZE_ZBUS int - default 2048 if ZBUS_MSG_SUBSCRIBER_BUF_ALLOC_DYNAMIC && !ZBUS_RUNTIME_OBSERVERS - default 1024 if !ZBUS_MSG_SUBSCRIBER_BUF_ALLOC_DYNAMIC && ZBUS_RUNTIME_OBSERVERS - default 3072 if ZBUS_MSG_SUBSCRIBER_BUF_ALLOC_DYNAMIC && ZBUS_RUNTIME_OBSERVERS + default 2048 if ZBUS_MSG_SUBSCRIBER_BUF_ALLOC_DYNAMIC && !ZBUS_RUNTIME_OBSERVERS_NODE_ALLOC_DYNAMIC + default 1024 if !ZBUS_MSG_SUBSCRIBER_BUF_ALLOC_DYNAMIC && ZBUS_RUNTIME_OBSERVERS_NODE_ALLOC_DYNAMIC + default 3072 if ZBUS_MSG_SUBSCRIBER_BUF_ALLOC_DYNAMIC && ZBUS_RUNTIME_OBSERVERS_NODE_ALLOC_DYNAMIC module = ZBUS diff --git a/subsys/zbus/zbus_runtime_observers.c b/subsys/zbus/zbus_runtime_observers.c index 41c00c9aae7..fac25ac4b72 100644 --- a/subsys/zbus/zbus_runtime_observers.c +++ b/subsys/zbus/zbus_runtime_observers.c @@ -8,17 +8,63 @@ LOG_MODULE_DECLARE(zbus, CONFIG_ZBUS_LOG_LEVEL); -int zbus_chan_add_obs(const struct zbus_channel *chan, const struct zbus_observer *obs, - struct zbus_observer_node *node, k_timeout_t timeout) +#if defined(CONFIG_ZBUS_RUNTIME_OBSERVERS_NODE_ALLOC_DYNAMIC) +static inline int _zbus_runtime_observer_node_alloc(struct zbus_observer_node **node, + k_timeout_t timeout) +{ + ARG_UNUSED(timeout); + + *node = k_malloc(sizeof(struct zbus_observer_node)); + + _ZBUS_ASSERT(*node != NULL, "could not allocate observer node the heap is full!"); + + if (*node == NULL) { + return -ENOMEM; + } + + return 0; +} + +static inline void _zbus_runtime_observer_node_free(struct zbus_observer_node *node) +{ + k_free(node); +} + +#elif defined(CONFIG_ZBUS_RUNTIME_OBSERVERS_NODE_ALLOC_STATIC) + +K_MEM_SLAB_DEFINE_STATIC(_zbus_runtime_observers_slab, sizeof(struct zbus_observer_node), + CONFIG_ZBUS_RUNTIME_OBSERVERS_NODE_POOL_SIZE, 8); + +static inline int _zbus_runtime_observer_node_alloc(struct zbus_observer_node **node, + k_timeout_t timeout) +{ + int err = k_mem_slab_alloc(&_zbus_runtime_observers_slab, (void **)node, timeout); + + _ZBUS_ASSERT(*node != NULL, "not enough runtime observer nodes in the pool. Increase the " + "ZBUS_RUNTIME_OBSERVERS_NODE_POOL_SIZE"); + + return err; +} + +static inline void _zbus_runtime_observer_node_free(struct zbus_observer_node *node) +{ + k_mem_slab_free(&_zbus_runtime_observers_slab, (void *)node); +} + +#endif /* CONFIG_ZBUS_RUNTIME_OBSERVERS_NODE_ALLOC_DYNAMIC */ + +static inline int _zbus_runtime_take_chan_sem_and_obs_check(const struct zbus_channel *chan, + const struct zbus_observer *obs, + k_timeout_t timeout) { int err; + struct zbus_observer_node *obs_nd, *tmp; struct zbus_channel_observation *observation; _ZBUS_ASSERT(!k_is_in_isr(), "ISR blocked"); _ZBUS_ASSERT(chan != NULL, "chan is required"); _ZBUS_ASSERT(obs != NULL, "obs is required"); - _ZBUS_ASSERT(node != NULL, "node is required"); err = k_sem_take(&chan->data->sem, timeout); if (err) { @@ -47,7 +93,29 @@ int zbus_chan_add_obs(const struct zbus_channel *chan, const struct zbus_observe } } + return 0; +} + +#if defined(CONFIG_ZBUS_RUNTIME_OBSERVERS_NODE_ALLOC_NONE) +int zbus_chan_add_obs_with_node(const struct zbus_channel *chan, const struct zbus_observer *obs, + struct zbus_observer_node *node, k_timeout_t timeout) +{ + int err; + + /* On success the channel semaphore has been taken */ + err = _zbus_runtime_take_chan_sem_and_obs_check(chan, obs, timeout); + if (err) { + return err; + } + + if (node->chan != NULL) { + k_sem_give(&chan->data->sem); + + return -EBUSY; + } + node->obs = obs; + node->chan = chan; sys_slist_append(&chan->data->observers, &node->node); @@ -56,6 +124,39 @@ int zbus_chan_add_obs(const struct zbus_channel *chan, const struct zbus_observe return 0; } +#else + +int zbus_chan_add_obs(const struct zbus_channel *chan, const struct zbus_observer *obs, + k_timeout_t timeout) +{ + int err; + k_timepoint_t end_time = sys_timepoint_calc(timeout); + + /* On success the channel semaphore has been taken */ + err = _zbus_runtime_take_chan_sem_and_obs_check(chan, obs, timeout); + if (err) { + return err; + } + + struct zbus_observer_node *new_obs_nd = NULL; + + err = _zbus_runtime_observer_node_alloc(&new_obs_nd, sys_timepoint_timeout(end_time)); + if (err) { + k_sem_give(&chan->data->sem); + + return err; + } + + new_obs_nd->obs = obs; + + sys_slist_append(&chan->data->observers, &new_obs_nd->node); + + k_sem_give(&chan->data->sem); + + return 0; +} +#endif /* CONFIG_ZBUS_RUNTIME_OBSERVERS_NODE_ALLOC_NONE */ + int zbus_chan_rm_obs(const struct zbus_channel *chan, const struct zbus_observer *obs, k_timeout_t timeout) { @@ -75,6 +176,11 @@ int zbus_chan_rm_obs(const struct zbus_channel *chan, const struct zbus_observer SYS_SLIST_FOR_EACH_CONTAINER_SAFE(&chan->data->observers, obs_nd, tmp, node) { if (obs_nd->obs == obs) { sys_slist_remove(&chan->data->observers, &prev_obs_nd->node, &obs_nd->node); +#if defined(CONFIG_ZBUS_RUNTIME_OBSERVERS_NODE_ALLOC_NONE) + obs_nd->chan = NULL; +#else + _zbus_runtime_observer_node_free(obs_nd); +#endif k_sem_give(&chan->data->sem);