diff --git a/cmake/linker_script/common/common-ram.cmake b/cmake/linker_script/common/common-ram.cmake index b625ac9daa5..e6ef59eaf29 100644 --- a/cmake/linker_script/common/common-ram.cmake +++ b/cmake/linker_script/common/common-ram.cmake @@ -103,8 +103,8 @@ if(CONFIG_ZTEST_NEW_API) endif() if(CONFIG_ZBUS) - zephyr_iterable_section(NAME zbus_channel GROUP DATA_REGION ${XIP_ALIGN_WITH_INPUT} SUBALIGN 4) zephyr_iterable_section(NAME zbus_observer GROUP DATA_REGION ${XIP_ALIGN_WITH_INPUT} SUBALIGN 4) + zephyr_iterable_section(NAME zbus_channel_observation_mask GROUP DATA_REGION ${XIP_ALIGN_WITH_INPUT} SUBALIGN 4) endif() if(CONFIG_UVB) diff --git a/cmake/linker_script/common/common-rom.cmake b/cmake/linker_script/common/common-rom.cmake index 470996e470e..d81b215facc 100644 --- a/cmake/linker_script/common/common-rom.cmake +++ b/cmake/linker_script/common/common-rom.cmake @@ -213,3 +213,8 @@ endif() if(CONFIG_USBD_MSC_CLASS) zephyr_iterable_section(NAME usbd_msc_lun KVMA RAM_REGION GROUP RODATA_REGION SUBALIGN 4) endif() + +if(CONFIG_ZBUS) + zephyr_iterable_section(NAME zbus_channel KVMA RAM_REGION GROUP RODATA_REGION SUBALIGN 4) + zephyr_iterable_section(NAME zbus_channel_observation KVMA RAM_REGION GROUP RODATA_REGION SUBALIGN 4) +endif() diff --git a/include/zephyr/linker/common-ram.ld b/include/zephyr/linker/common-ram.ld index e354c723a2f..bfaa26ed400 100644 --- a/include/zephyr/linker/common-ram.ld +++ b/include/zephyr/linker/common-ram.ld @@ -124,6 +124,11 @@ ITERABLE_SECTION_RAM(sensing_sensor, 4) #endif /* CONFIG_SENSING */ +#if defined(CONFIG_ZBUS) + ITERABLE_SECTION_RAM(zbus_observer, 4) + ITERABLE_SECTION_RAM(zbus_channel_observation_mask, 1) +#endif /* CONFIG_ZBUS */ + #ifdef CONFIG_USERSPACE _static_kernel_objects_end = .; #endif diff --git a/include/zephyr/linker/common-rom/common-rom-misc.ld b/include/zephyr/linker/common-rom/common-rom-misc.ld index 35c6f011305..74657c847a1 100644 --- a/include/zephyr/linker/common-rom/common-rom-misc.ld +++ b/include/zephyr/linker/common-rom/common-rom-misc.ld @@ -34,6 +34,11 @@ ITERABLE_SECTION_ROM(emul, 4) #endif /* CONFIG_EMUL */ +#if defined(CONFIG_ZBUS) + ITERABLE_SECTION_ROM(zbus_channel, 4) + ITERABLE_SECTION_ROM(zbus_channel_observation, 4) +#endif /* CONFIG_ZBUS */ + SECTION_DATA_PROLOGUE(symbol_to_keep,,) { __symbol_to_keep_start = .; diff --git a/include/zephyr/zbus/zbus.h b/include/zephyr/zbus/zbus.h index eba0b13f38d..37765c96c90 100644 --- a/include/zephyr/zbus/zbus.h +++ b/include/zephyr/zbus/zbus.h @@ -22,6 +22,35 @@ extern "C" { * @{ */ +/** + * @brief Type used to represent a channel mutable data. + * + * Every channel has a zbus_channel_data structure associated. + */ +struct zbus_channel_data { + /** Static channel observer list start index. Considering the ITERABLE SECTIONS allocation + * order. + */ + int16_t observers_start_idx; + + /** Static channel observer list end index. Considering the ITERABLE SECTIONS allocation + * order. + */ + int16_t observers_end_idx; + + /** Access control mutex. Points to the mutex used to avoid race conditions + * for accessing the channel. + */ + struct k_mutex mutex; + +#if defined(CONFIG_ZBUS_RUNTIME_OBSERVERS) || defined(__DOXYGEN__) + /** Channel observer list. Represents the channel's observers list, it can be empty + * or have listeners and subscribers mixed in any sequence. It can be changed in runtime. + */ + sys_slist_t observers; +#endif /* CONFIG_ZBUS_RUNTIME_OBSERVERS */ +}; + /** * @brief Type used to represent a channel. * @@ -33,40 +62,37 @@ struct zbus_channel { /** Channel name. */ const char *const name; #endif + /** Message reference. Represents the message's reference that points to the actual + * shared memory region. + */ + void *const message; + /** Message size. Represents the channel's message size. */ - const uint16_t message_size; + const size_t message_size; /** User data available to extend zbus features. The channel must be claimed before * using this field. */ void *const user_data; - /** Message reference. Represents the message's reference that points to the actual - * shared memory region. - */ - void *const message; - /** Message validator. Stores the reference to the function to check the message * validity before actually performing the publishing. No invalid messages can be * published. Every message is valid when this field is empty. */ bool (*const validator)(const void *msg, size_t msg_size); - /** Access control mutex. Points to the mutex used to avoid race conditions - * for accessing the channel. - */ - struct k_mutex *mutex; -#if (CONFIG_ZBUS_RUNTIME_OBSERVERS_POOL_SIZE > 0) || defined(__DOXYGEN__) - /** Dynamic channel observer list. Represents the channel's observers list, it can be empty - * or have listeners and subscribers mixed in any sequence. It can be changed in runtime. - */ - sys_slist_t *runtime_observers; -#endif /* CONFIG_ZBUS_RUNTIME_OBSERVERS_POOL_SIZE */ + /** Mutable channel data struct. */ + struct zbus_channel_data *const data; +}; - /** Channel observer list. Represents the channel's observers list, it can be empty or - * have listeners and subscribers mixed in any sequence. - */ - const struct zbus_observer *const *observers; +/** + * @brief Type used to represent an observer type. + * + * A observer can be a listener or a subscriber. + */ +enum __packed zbus_observer_type { + ZBUS_OBSERVER_LISTENER_TYPE, + ZBUS_OBSERVER_SUBSCRIBER_TYPE }; /** @@ -89,16 +115,30 @@ struct zbus_observer { /** Observer name. */ const char *const name; #endif + /** Type indication. */ + enum zbus_observer_type type; + /** Enabled flag. Indicates if observer is receiving notification. */ bool enabled; - /** Observer message queue. It turns the observer into a subscriber. */ - struct k_msgq *const queue; - /** Observer callback function. It turns the observer into a listener. */ - void (*const callback)(const struct zbus_channel *chan); + union { + /** Observer message queue. It turns the observer into a subscriber. */ + struct k_msgq *const queue; + + /** Observer callback function. It turns the observer into a listener. */ + void (*const callback)(const struct zbus_channel *chan); + }; }; /** @cond INTERNAL_HIDDEN */ +struct zbus_channel_observation_mask { + bool enabled; +}; + +struct zbus_channel_observation { + const struct zbus_channel *const chan; + const struct zbus_observer *const obs; +}; #if defined(CONFIG_ZBUS_ASSERT_MOCK) #define _ZBUS_ASSERT(_cond, _fmt, ...) \ @@ -122,13 +162,13 @@ struct zbus_observer { #if defined(CONFIG_ZBUS_OBSERVER_NAME) #define ZBUS_OBSERVER_NAME_INIT(_name) .name = #_name, -#define _ZBUS_OBS_NAME(_obs) (_obs)->name +#define _ZBUS_OBS_NAME(_obs) (_obs)->name #else #define ZBUS_OBSERVER_NAME_INIT(_name) #define _ZBUS_OBS_NAME(_obs) "" #endif -#if CONFIG_ZBUS_RUNTIME_OBSERVERS_POOL_SIZE > 0 +#if defined(CONFIG_ZBUS_RUNTIME_OBSERVERS) #define ZBUS_RUNTIME_OBSERVERS_LIST_DECL(_slist_name) static sys_slist_t _slist_name #define ZBUS_RUNTIME_OBSERVERS_LIST_INIT(_slist_name) .runtime_observers = &_slist_name, #else @@ -136,26 +176,84 @@ struct zbus_observer { #define ZBUS_RUNTIME_OBSERVERS_LIST_INIT(_slist_name) /* No runtime observers */ #endif -#if defined(CONFIG_ZBUS_STRUCTS_ITERABLE_ACCESS) -#define _ZBUS_STRUCT_DECLARE(_type, _name) STRUCT_SECTION_ITERABLE(_type, _name) -#else -#define _ZBUS_STRUCT_DECLARE(_type, _name) struct _type _name -#endif /* CONFIG_ZBUS_STRUCTS_ITERABLE_ACCESS */ - #define _ZBUS_OBS_EXTERN(_name) extern struct zbus_observer _name #define _ZBUS_CHAN_EXTERN(_name) extern const struct zbus_channel _name #define ZBUS_REF(_value) &(_value) +#define FOR_EACH_FIXED_ARG_NONEMPTY_TERM(F, sep, fixed_arg, ...) \ + COND_CODE_0(/* are there zero non-empty arguments ? */ \ + NUM_VA_ARGS_LESS_1( \ + LIST_DROP_EMPTY(__VA_ARGS__, _)), /* if so, expand to nothing */ \ + (), /* otherwise, expand to: */ \ + (FOR_EACH_IDX_FIXED_ARG( \ + F, sep, fixed_arg, \ + LIST_DROP_EMPTY(__VA_ARGS__)) /* plus a final terminator */ \ + __DEBRACKET sep)) + +#define _ZBUS_OBSERVATION_PREFIX(_idx) \ + GET_ARG_N(_idx, 00, 01, 02, 03, 04, 05, 06, 07, 08, 09, 10, 11, 12, 13, 14, 15, 16, 17, \ + 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, \ + 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, \ + 58, 59, 60, 61, 62, 63) + +#define _ZBUS_CHAN_OBSERVATION(_idx, _obs, _chan) \ + const STRUCT_SECTION_ITERABLE( \ + zbus_channel_observation, \ + _CONCAT(_chan, _ZBUS_OBSERVATION_PREFIX(UTIL_INC(_idx)))) = {.chan = &_chan, \ + .obs = &_obs}; \ + STRUCT_SECTION_ITERABLE(zbus_channel_observation_mask, \ + _CONCAT(_CONCAT(_chan, _ZBUS_OBSERVATION_PREFIX(UTIL_INC(_idx))), \ + _mask)) = {.enabled = false}; + +#if defined(CONFIG_ZBUS_RUNTIME_OBSERVERS) || defined(__DOXYGEN__) +#define _ZBUS_RUNTIME_OBSERVERS(_name) .observers = &(_CONCAT(_observers_, _name)), +#define _ZBUS_RUNTIME_OBSERVERS_DECL(_name) static sys_slist_t _CONCAT(_observers_, _name); +#else +#define _ZBUS_RUNTIME_OBSERVERS(_name) +#define _ZBUS_RUNTIME_OBSERVERS_DECL(_name) +#endif /* CONFIG_ZBUS_RUNTIME_OBSERVERS */ + +k_timeout_t _zbus_timeout_remainder(uint64_t end_ticks); /** @endcond */ +/** + * @brief Add a static channel observervation. + * + * This macro initializes a channel observation by receiving the + * channel and the observer. + * + * @param _chan Channel instance. + * @param _obs Observer instance. + * @param _masked Observation state. + * @param _prio Observer notification sequence priority. + */ +#define ZBUS_CHAN_ADD_OBS_WITH_MASK(_chan, _obs, _masked, _prio) \ + const STRUCT_SECTION_ITERABLE(zbus_channel_observation, \ + _CONCAT(_CONCAT(_chan, zz), _CONCAT(_prio, _obs))) = { \ + .chan = &_chan, .obs = &_obs}; \ + STRUCT_SECTION_ITERABLE(zbus_channel_observation_mask, \ + _CONCAT(_CONCAT(_CONCAT(_chan, zz), _CONCAT(_prio, _obs)), \ + _mask)) = {.enabled = _masked} +/** + * @brief Add a static channel observervation. + * + * This macro initializes a channel observation by receiving the + * channel and the observer. + * + * @param _chan Channel instance. + * @param _obs Observer instance. + * @param _prio Observer notification sequence priority. + */ +#define ZBUS_CHAN_ADD_OBS(_chan, _obs, _prio) ZBUS_CHAN_ADD_OBS_WITH_MASK(_chan, _obs, false, _prio) + /** * @def ZBUS_OBS_DECLARE * This macro list the observers to be used in a file. Internally, it declares the observers with * the extern statement. Note it is only necessary when the observers are declared outside the file. */ -#define ZBUS_OBS_DECLARE(...) FOR_EACH(_ZBUS_OBS_EXTERN, (;), __VA_ARGS__) +#define ZBUS_OBS_DECLARE(...) FOR_EACH_NONEMPTY_TERM(_ZBUS_OBS_EXTERN, (;), __VA_ARGS__) /** * @def ZBUS_CHAN_DECLARE @@ -192,23 +290,20 @@ struct zbus_observer { * first the highest priority. * @param _init_val The message initialization. */ -#define ZBUS_CHAN_DEFINE(_name, _type, _validator, _user_data, _observers, _init_val) \ - static _type _CONCAT(_zbus_message_, _name) = _init_val; \ - static K_MUTEX_DEFINE(_CONCAT(_zbus_mutex_, _name)); \ - ZBUS_RUNTIME_OBSERVERS_LIST_DECL(_CONCAT(_runtime_observers_, _name)); \ - FOR_EACH_NONEMPTY_TERM(_ZBUS_OBS_EXTERN, (;), _observers) \ - static const struct zbus_observer *const _CONCAT(_zbus_observers_, _name)[] = { \ - FOR_EACH_NONEMPTY_TERM(ZBUS_REF, (,), _observers) NULL}; \ - const _ZBUS_STRUCT_DECLARE(zbus_channel, _name) = { \ - ZBUS_CHANNEL_NAME_INIT(_name) /* Name */ \ - .message_size = sizeof(_type), /* Message size */ \ - .user_data = _user_data, /* User data */ \ - .message = &_CONCAT(_zbus_message_, _name), /* Reference to the message */\ - .validator = (_validator), /* Validator function */ \ - .mutex = &_CONCAT(_zbus_mutex_, _name), /* Channel's Mutex */ \ - ZBUS_RUNTIME_OBSERVERS_LIST_INIT( \ - _CONCAT(_runtime_observers_, _name)) /* Runtime observer list */ \ - .observers = _CONCAT(_zbus_observers_, _name)} /* Static observer list */ +#define ZBUS_CHAN_DEFINE(_name, _type, _validator, _user_data, _observers, _init_val) \ + static _type _CONCAT(_zbus_message_, _name) = _init_val; \ + static struct zbus_channel_data _CONCAT(_zbus_chan_data_, _name) = { \ + .observers_start_idx = -1, .observers_end_idx = -1}; \ + static K_MUTEX_DEFINE(_CONCAT(_zbus_mutex_, _name)); \ + const STRUCT_SECTION_ITERABLE(zbus_channel, _name) = { \ + ZBUS_CHANNEL_NAME_INIT(_name) /* Maybe removed */ \ + .message = &_CONCAT(_zbus_message_, _name), \ + .message_size = sizeof(_type), .user_data = _user_data, .validator = (_validator), \ + .data = &_CONCAT(_zbus_chan_data_, _name)}; \ + /* Extern declaration of observers */ \ + ZBUS_OBS_DECLARE(_observers); \ + /* Create all channel observations from observers list */ \ + FOR_EACH_FIXED_ARG_NONEMPTY_TERM(_ZBUS_CHAN_OBSERVATION, (;), _name, _observers) /** * @brief Initialize a message. @@ -233,14 +328,27 @@ struct zbus_observer { * * @param[in] _name The subscriber's name. * @param[in] _queue_size The notification queue's size. + * @param[in] _enable The subscriber initial enable state. */ -#define ZBUS_SUBSCRIBER_DEFINE(_name, _queue_size) \ +#define ZBUS_SUBSCRIBER_DEFINE_WITH_ENABLE(_name, _queue_size, _enable) \ K_MSGQ_DEFINE(_zbus_observer_queue_##_name, sizeof(const struct zbus_channel *), \ _queue_size, sizeof(const struct zbus_channel *)); \ - _ZBUS_STRUCT_DECLARE(zbus_observer, \ - _name) = {ZBUS_OBSERVER_NAME_INIT(_name) /* Name field */ \ - .enabled = true, \ - .queue = &_zbus_observer_queue_##_name, .callback = NULL} + STRUCT_SECTION_ITERABLE(zbus_observer, _name) = { \ + ZBUS_OBSERVER_NAME_INIT(_name) /* Name field */ \ + .type = ZBUS_OBSERVER_SUBSCRIBER_TYPE, \ + .enabled = _enable, .queue = &_zbus_observer_queue_##_name} +/** + * @brief Define and initialize a subscriber. + * + * This macro defines an observer of subscriber type. It defines a message queue where the + * subscriber will receive the notification asynchronously, and initialize the ``struct + * zbus_observer`` defining the subscriber. + * + * @param[in] _name The subscriber's name. + * @param[in] _queue_size The notification queue's size. + */ +#define ZBUS_SUBSCRIBER_DEFINE(_name, _queue_size) \ + ZBUS_SUBSCRIBER_DEFINE_WITH_ENABLE(_name, _queue_size, true) /** * @brief Define and initialize a listener. @@ -251,12 +359,24 @@ struct zbus_observer { * * @param[in] _name The listener's name. * @param[in] _cb The callback function. + * @param[in] _enable The listener initial enable state. */ -#define ZBUS_LISTENER_DEFINE(_name, _cb) \ - _ZBUS_STRUCT_DECLARE(zbus_observer, \ - _name) = {ZBUS_OBSERVER_NAME_INIT(_name) /* Name field */ \ - .enabled = true, \ - .queue = NULL, .callback = (_cb)} +#define ZBUS_LISTENER_DEFINE_WITH_ENABLE(_name, _cb, _enable) \ + STRUCT_SECTION_ITERABLE(zbus_observer, \ + _name) = {ZBUS_OBSERVER_NAME_INIT(_name) /* Name field */ \ + .type = ZBUS_OBSERVER_LISTENER_TYPE, \ + .enabled = _enable, .callback = (_cb)} +/** + * @brief Define and initialize a listener. + * + * This macro defines an observer of listener type. This macro establishes the callback where the + * listener will be notified synchronously and initialize the ``struct zbus_observer`` defining the + * listener. The listeners are defined in the disabled state with this macro. + * + * @param[in] _name The listener's name. + * @param[in] _cb The callback function. + */ +#define ZBUS_LISTENER_DEFINE(_name, _cb) ZBUS_LISTENER_DEFINE_WITH_ENABLE(_name, _cb, true) /** * @@ -453,7 +573,7 @@ static inline void *zbus_chan_user_data(const struct zbus_channel *chan) return chan->user_data; } -#if (CONFIG_ZBUS_RUNTIME_OBSERVERS_POOL_SIZE > 0) || defined(__DOXYGEN__) +#if defined(CONFIG_ZBUS_RUNTIME_OBSERVERS) || defined(__DOXYGEN__) /** * @brief Add an observer to a channel. @@ -494,15 +614,6 @@ int zbus_chan_add_obs(const struct zbus_channel *chan, const struct zbus_observe int zbus_chan_rm_obs(const struct zbus_channel *chan, const struct zbus_observer *obs, k_timeout_t timeout); -/** - * @brief Get zbus runtime observers pool. - * - * This routine returns a reference of the runtime observers pool. - * - * @return Reference of runtime observers pool. - */ -struct k_mem_slab *zbus_runtime_obs_pool(void); - /** @cond INTERNAL_HIDDEN */ struct zbus_observer_node { @@ -512,7 +623,7 @@ struct zbus_observer_node { /** @endcond */ -#endif /* CONFIG_ZBUS_RUNTIME_OBSERVERS_POOL_SIZE */ +#endif /* CONFIG_ZBUS_RUNTIME_OBSERVERS */ /** * @brief Change the observer state. @@ -536,6 +647,58 @@ static inline int zbus_obs_set_enable(struct zbus_observer *obs, bool enabled) return 0; } +/** + * @brief Get the observer state. + * + * This routine retrieves the observer state. + * + * @param[in] obs The observer's reference. + * @param[out] enable The boolean output's reference. + * + * @return Observer state. + */ +static inline int zbus_obs_is_enabled(struct zbus_observer *obs, bool *enable) +{ + _ZBUS_ASSERT(obs != NULL, "obs is required"); + _ZBUS_ASSERT(enable != NULL, "enable is required"); + + *enable = obs->enabled; + + return 0; +} + +/** + * @brief Mask notifications from a channel to an observer. + * + * The observer can mask notifications from a specific observing channel by calling this function. + * + * @param obs The observer's reference to be added. + * @param chan The channel's reference. + * @param masked The mask state. When the mask is true, the observer will not receive notifications + * from the channel. + * + * @retval 0 Channel notifications masked to the observer. + * @retval -ESRCH No observation found for the related pair chan/obs. + * @retval -EINVAL Some parameter is invalid. + */ +int zbus_obs_set_chan_notification_mask(const struct zbus_observer *obs, + const struct zbus_channel *chan, bool masked); + +/** + * @brief Get the notifications masking state from a channel to an observer. + * + * @param obs The observer's reference to be added. + * @param chan The channel's reference. + * @param[out] masked The mask state. When the mask is true, the observer will not receive + * notifications from the channel. + * + * @retval 0 Retrieved the masked state. + * @retval -ESRCH No observation found for the related pair chan/obs. + * @retval -EINVAL Some parameter is invalid. + */ +int zbus_obs_is_chan_notification_masked(const struct zbus_observer *obs, + const struct zbus_channel *chan, bool *masked); + #if defined(CONFIG_ZBUS_OBSERVER_NAME) || defined(__DOXYGEN__) /** @@ -577,7 +740,6 @@ static inline const char *zbus_obs_name(const struct zbus_observer *obs) int zbus_sub_wait(const struct zbus_observer *sub, const struct zbus_channel **chan, k_timeout_t timeout); -#if defined(CONFIG_ZBUS_STRUCTS_ITERABLE_ACCESS) || defined(__DOXYGEN__) /** * * @brief Iterate over channels. @@ -586,10 +748,28 @@ int zbus_sub_wait(const struct zbus_observer *sub, const struct zbus_channel **c * iterator_func which is called for each channel. If the iterator_func returns false all * the iteration stops. * + * @param[in] iterator_func The function that will be execute on each iteration. + * * @retval true Iterator executed for all channels. * @retval false Iterator could not be executed. Some iterate returned false. */ bool zbus_iterate_over_channels(bool (*iterator_func)(const struct zbus_channel *chan)); +/** + * + * @brief Iterate over channels with user data. + * + * Enables the developer to iterate over the channels giving to this function an + * iterator_func which is called for each channel. If the iterator_func returns false all + * the iteration stops. + * + * @param[in] iterator_func The function that will be execute on each iteration. + * @param[in] user_data The user data that can be passed in the function. + * + * @retval true Iterator executed for all channels. + * @retval false Iterator could not be executed. Some iterate returned false. + */ +bool zbus_iterate_over_channels_with_user_data( + bool (*iterator_func)(const struct zbus_channel *chan, void *user_data), void *user_data); /** * @@ -599,12 +779,29 @@ bool zbus_iterate_over_channels(bool (*iterator_func)(const struct zbus_channel * iterator_func which is called for each observer. If the iterator_func returns false all * the iteration stops. * + * @param[in] iterator_func The function that will be execute on each iteration. + * * @retval true Iterator executed for all channels. * @retval false Iterator could not be executed. Some iterate returned false. */ bool zbus_iterate_over_observers(bool (*iterator_func)(const struct zbus_observer *obs)); +/** + * + * @brief Iterate over observers with user data. + * + * Enables the developer to iterate over the observers giving to this function an + * iterator_func which is called for each observer. If the iterator_func returns false all + * the iteration stops. + * + * @param[in] iterator_func The function that will be execute on each iteration. + * @param[in] user_data The user data that can be passed in the function. + * + * @retval true Iterator executed for all channels. + * @retval false Iterator could not be executed. Some iterate returned false. + */ +bool zbus_iterate_over_observers_with_user_data( + bool (*iterator_func)(const struct zbus_observer *obs, void *user_data), void *user_data); -#endif /* CONFIG_ZBUS_STRUCTS_ITERABLE_ACCESS */ /** * @} */ diff --git a/subsys/zbus/CMakeLists.txt b/subsys/zbus/CMakeLists.txt index d6511501a20..6284e8a37fa 100644 --- a/subsys/zbus/CMakeLists.txt +++ b/subsys/zbus/CMakeLists.txt @@ -4,11 +4,8 @@ zephyr_library() zephyr_library_sources(zbus.c) -if(CONFIG_ZBUS_RUNTIME_OBSERVERS_POOL_SIZE GREATER 0) +if(CONFIG_ZBUS_RUNTIME_OBSERVERS) zephyr_library_sources(zbus_runtime_observers.c) endif() -if(CONFIG_ZBUS_STRUCTS_ITERABLE_ACCESS) - zephyr_library_sources(zbus_iterable_sections.c) - zephyr_linker_sources(DATA_SECTIONS zbus.ld) -endif() +zephyr_library_sources(zbus_iterable_sections.c) diff --git a/subsys/zbus/Kconfig b/subsys/zbus/Kconfig index 338b21361f9..622f89641a2 100644 --- a/subsys/zbus/Kconfig +++ b/subsys/zbus/Kconfig @@ -8,10 +8,9 @@ menuconfig ZBUS if ZBUS -config ZBUS_STRUCTS_ITERABLE_ACCESS - bool "Zbus iterable sections support." - depends on !XTENSA - default y +config ZBUS_CHANNELS_SYS_INIT_PRIORITY + default 5 + int "The priority used during the SYS_INIT procedure." config ZBUS_CHANNEL_NAME bool "Channel name field" @@ -19,14 +18,9 @@ config ZBUS_CHANNEL_NAME config ZBUS_OBSERVER_NAME bool "Observer name field" -config ZBUS_RUNTIME_OBSERVERS_POOL_SIZE - int "The size of the runtime observers pool." - default 0 - help - When the size is bigger than zero this feature will be enabled. It applies the Object Pool Pattern, - where the objects in the pool are pre-allocated and can be used and recycled after use. The - technique avoids dynamic allocation and allows the code to increase the number of observers by - only changing a configuration. +config ZBUS_RUNTIME_OBSERVERS + bool "Runtime observers support." + default n config ZBUS_ASSERT_MOCK bool "Zbus assert mock for test purposes." diff --git a/subsys/zbus/zbus.c b/subsys/zbus/zbus.c index 7c55e50f434..abaf92c8641 100644 --- a/subsys/zbus/zbus.c +++ b/subsys/zbus/zbus.c @@ -4,118 +4,145 @@ */ #include +#include +#include #include #include #include LOG_MODULE_REGISTER(zbus, CONFIG_ZBUS_LOG_LEVEL); -#if (CONFIG_ZBUS_RUNTIME_OBSERVERS_POOL_SIZE > 0) -static inline void _zbus_notify_runtime_listeners(const struct zbus_channel *chan) +int _zbus_init(void) { - __ASSERT(chan != NULL, "chan is required"); + const struct zbus_channel *curr = NULL; + const struct zbus_channel *prev = NULL; - struct zbus_observer_node *obs_nd, *tmp; + STRUCT_SECTION_FOREACH(zbus_channel_observation, observation) { + curr = observation->chan; - SYS_SLIST_FOR_EACH_CONTAINER_SAFE(chan->runtime_observers, obs_nd, tmp, node) { - - __ASSERT(obs_nd != NULL, "observer node is NULL"); - - if (obs_nd->obs->enabled && (obs_nd->obs->callback != NULL)) { - obs_nd->obs->callback(chan); + if (prev != curr) { + if (prev == NULL) { + curr->data->observers_start_idx = 0; + curr->data->observers_end_idx = 0; + } else { + curr->data->observers_start_idx = prev->data->observers_end_idx; + curr->data->observers_end_idx = prev->data->observers_end_idx; + } + prev = curr; } + + ++(curr->data->observers_end_idx); } + STRUCT_SECTION_FOREACH(zbus_channel, chan) { + k_mutex_init(&chan->data->mutex); + +#if defined(CONFIG_ZBUS_RUNTIME_OBSERVERS) + sys_slist_init(&chan->data->observers); +#endif /* CONFIG_ZBUS_RUNTIME_OBSERVERS */ + } + return 0; +} +SYS_INIT(_zbus_init, APPLICATION, CONFIG_ZBUS_CHANNELS_SYS_INIT_PRIORITY); + +static inline int _zbus_notify_observer(const struct zbus_channel *chan, + const struct zbus_observer *obs, k_timepoint_t end_time) +{ + int err = 0; + + if (obs->type == ZBUS_OBSERVER_LISTENER_TYPE) { + obs->callback(chan); + + } else if (obs->type == ZBUS_OBSERVER_SUBSCRIBER_TYPE) { + err = k_msgq_put(obs->queue, &chan, sys_timepoint_timeout(end_time)); + } else { + CODE_UNREACHABLE; + } + return err; } -static inline int _zbus_notify_runtime_subscribers(const struct zbus_channel *chan, - k_timepoint_t end_time) +static inline int _zbus_vded_exec(const struct zbus_channel *chan, k_timepoint_t end_time) { - __ASSERT(chan != NULL, "chan is required"); + int err = 0; + int last_error = 0; - int last_error = 0, err; + _ZBUS_ASSERT(chan != NULL, "chan is required"); + + /* Static observer event dispatcher logic */ + struct zbus_channel_observation *observation; + struct zbus_channel_observation_mask *observation_mask; + + for (int16_t i = chan->data->observers_start_idx, limit = chan->data->observers_end_idx; + i < limit; ++i) { + STRUCT_SECTION_GET(zbus_channel_observation, i, &observation); + STRUCT_SECTION_GET(zbus_channel_observation_mask, i, &observation_mask); + + _ZBUS_ASSERT(observation != NULL, "observation must be not NULL"); + + const struct zbus_observer *obs = observation->obs; + + if (!obs->enabled || observation_mask->enabled) { + continue; + } + + err = _zbus_notify_observer(chan, obs, end_time); + + _ZBUS_ASSERT(err == 0, + "could not deliver notification to observer %s. Error code %d", + _ZBUS_OBS_NAME(obs), err); + + if (err) { + last_error = err; + } + } + +#if defined(CONFIG_ZBUS_RUNTIME_OBSERVERS) + /* Dynamic observer event dispatcher logic */ struct zbus_observer_node *obs_nd, *tmp; - SYS_SLIST_FOR_EACH_CONTAINER_SAFE(chan->runtime_observers, obs_nd, tmp, node) { + SYS_SLIST_FOR_EACH_CONTAINER_SAFE(&chan->data->observers, obs_nd, tmp, node) { - __ASSERT(obs_nd != NULL, "observer node is NULL"); + _ZBUS_ASSERT(obs_nd != NULL, "observer node is NULL"); - if (obs_nd->obs->enabled && (obs_nd->obs->queue != NULL)) { - err = k_msgq_put(obs_nd->obs->queue, &chan, - sys_timepoint_timeout(end_time)); + const struct zbus_observer *obs = obs_nd->obs; - _ZBUS_ASSERT(err == 0, - "could not deliver notification to observer %s. Error code %d", - _ZBUS_OBS_NAME(obs_nd->obs), err); + if (!obs->enabled) { + continue; + } - if (err) { - last_error = err; - } + err = _zbus_notify_observer(chan, obs, end_time); + + if (err) { + last_error = err; } } +#endif /* CONFIG_ZBUS_RUNTIME_OBSERVERS */ return last_error; } -#endif /* CONFIG_ZBUS_RUNTIME_OBSERVERS_POOL_SIZE */ - -static int _zbus_notify_observers(const struct zbus_channel *chan, k_timepoint_t end_time) -{ - int last_error = 0, err; - /* Notify static listeners */ - for (const struct zbus_observer *const *obs = chan->observers; *obs != NULL; ++obs) { - if ((*obs)->enabled && ((*obs)->callback != NULL)) { - (*obs)->callback(chan); - } - } - -#if CONFIG_ZBUS_RUNTIME_OBSERVERS_POOL_SIZE > 0 - _zbus_notify_runtime_listeners(chan); -#endif /* CONFIG_ZBUS_RUNTIME_OBSERVERS_POOL_SIZE */ - - /* Notify static subscribers */ - for (const struct zbus_observer *const *obs = chan->observers; *obs != NULL; ++obs) { - if ((*obs)->enabled && ((*obs)->queue != NULL)) { - err = k_msgq_put((*obs)->queue, &chan, sys_timepoint_timeout(end_time)); - _ZBUS_ASSERT(err == 0, "could not deliver notification to observer %s.", - _ZBUS_OBS_NAME(*obs)); - if (err) { - LOG_ERR("Observer %s at %p could not be notified. Error code %d", - _ZBUS_OBS_NAME(*obs), *obs, err); - last_error = err; - } - } - } - -#if CONFIG_ZBUS_RUNTIME_OBSERVERS_POOL_SIZE > 0 - err = _zbus_notify_runtime_subscribers(chan, end_time); - if (err) { - last_error = err; - } -#endif /* CONFIG_ZBUS_RUNTIME_OBSERVERS_POOL_SIZE */ - return last_error; -} int zbus_chan_pub(const struct zbus_channel *chan, const void *msg, k_timeout_t timeout) { int err; - k_timepoint_t end_time = sys_timepoint_calc(timeout); _ZBUS_ASSERT(!k_is_in_isr(), "zbus cannot be used inside ISRs"); _ZBUS_ASSERT(chan != NULL, "chan is required"); _ZBUS_ASSERT(msg != NULL, "msg is required"); + k_timepoint_t end_time = sys_timepoint_calc(timeout); + if (chan->validator != NULL && !chan->validator(msg, chan->message_size)) { return -ENOMSG; } - err = k_mutex_lock(chan->mutex, timeout); + err = k_mutex_lock(&chan->data->mutex, timeout); if (err) { return err; } memcpy(chan->message, msg, chan->message_size); - err = _zbus_notify_observers(chan, end_time); + err = _zbus_vded_exec(chan, end_time); - k_mutex_unlock(chan->mutex); + k_mutex_unlock(&chan->data->mutex); return err; } @@ -128,32 +155,33 @@ int zbus_chan_read(const struct zbus_channel *chan, void *msg, k_timeout_t timeo _ZBUS_ASSERT(chan != NULL, "chan is required"); _ZBUS_ASSERT(msg != NULL, "msg is required"); - err = k_mutex_lock(chan->mutex, timeout); + err = k_mutex_lock(&chan->data->mutex, timeout); if (err) { return err; } memcpy(msg, chan->message, chan->message_size); - return k_mutex_unlock(chan->mutex); + return k_mutex_unlock(&chan->data->mutex); } int zbus_chan_notify(const struct zbus_channel *chan, k_timeout_t timeout) { int err; - k_timepoint_t end_time = sys_timepoint_calc(timeout); _ZBUS_ASSERT(!k_is_in_isr(), "zbus cannot be used inside ISRs"); _ZBUS_ASSERT(chan != NULL, "chan is required"); - err = k_mutex_lock(chan->mutex, timeout); + k_timepoint_t end_time = sys_timepoint_calc(timeout); + + err = k_mutex_lock(&chan->data->mutex, timeout); if (err) { return err; } - err = _zbus_notify_observers(chan, end_time); + err = _zbus_vded_exec(chan, end_time); - k_mutex_unlock(chan->mutex); + k_mutex_unlock(&chan->data->mutex); return err; } @@ -163,7 +191,7 @@ int zbus_chan_claim(const struct zbus_channel *chan, k_timeout_t timeout) _ZBUS_ASSERT(!k_is_in_isr(), "zbus cannot be used inside ISRs"); _ZBUS_ASSERT(chan != NULL, "chan is required"); - int err = k_mutex_lock(chan->mutex, timeout); + int err = k_mutex_lock(&chan->data->mutex, timeout); if (err) { return err; @@ -177,7 +205,7 @@ int zbus_chan_finish(const struct zbus_channel *chan) _ZBUS_ASSERT(!k_is_in_isr(), "zbus cannot be used inside ISRs"); _ZBUS_ASSERT(chan != NULL, "chan is required"); - int err = k_mutex_unlock(chan->mutex); + int err = k_mutex_unlock(&chan->data->mutex); return err; } @@ -195,3 +223,51 @@ int zbus_sub_wait(const struct zbus_observer *sub, const struct zbus_channel **c return k_msgq_get(sub->queue, chan, timeout); } + +int zbus_obs_set_chan_notification_mask(const struct zbus_observer *obs, + const struct zbus_channel *chan, bool masked) +{ + _ZBUS_ASSERT(obs != NULL, "obs is required"); + _ZBUS_ASSERT(chan != NULL, "chan is required"); + + struct zbus_channel_observation *observation; + struct zbus_channel_observation_mask *observation_mask; + + for (int16_t i = chan->data->observers_start_idx, limit = chan->data->observers_end_idx; + i < limit; ++i) { + STRUCT_SECTION_GET(zbus_channel_observation, i, &observation); + STRUCT_SECTION_GET(zbus_channel_observation_mask, i, &observation_mask); + + _ZBUS_ASSERT(observation != NULL, "observation must be not NULL"); + + if (observation->obs == obs) { + observation_mask->enabled = masked; + return 0; + } + } + return -ESRCH; +} + +int zbus_obs_is_chan_notification_masked(const struct zbus_observer *obs, + const struct zbus_channel *chan, bool *masked) +{ + _ZBUS_ASSERT(obs != NULL, "obs is required"); + _ZBUS_ASSERT(chan != NULL, "chan is required"); + + struct zbus_channel_observation *observation; + struct zbus_channel_observation_mask *observation_mask; + + for (int16_t i = chan->data->observers_start_idx, limit = chan->data->observers_end_idx; + i < limit; ++i) { + STRUCT_SECTION_GET(zbus_channel_observation, i, &observation); + STRUCT_SECTION_GET(zbus_channel_observation_mask, i, &observation_mask); + + _ZBUS_ASSERT(observation != NULL, "observation must be not NULL"); + + if (observation->obs == obs) { + *masked = observation_mask->enabled; + return 0; + } + } + return -ESRCH; +} diff --git a/subsys/zbus/zbus.ld b/subsys/zbus/zbus.ld deleted file mode 100644 index ffeab1a3165..00000000000 --- a/subsys/zbus/zbus.ld +++ /dev/null @@ -1,4 +0,0 @@ -#include - -ITERABLE_SECTION_RAM(zbus_channel, 4) -ITERABLE_SECTION_RAM(zbus_observer, 4) diff --git a/subsys/zbus/zbus_iterable_sections.c b/subsys/zbus/zbus_iterable_sections.c index 48b3212e596..0504bc42273 100644 --- a/subsys/zbus/zbus_iterable_sections.c +++ b/subsys/zbus/zbus_iterable_sections.c @@ -17,6 +17,17 @@ bool zbus_iterate_over_channels(bool (*iterator_func)(const struct zbus_channel return true; } +bool zbus_iterate_over_channels_with_user_data( + bool (*iterator_func)(const struct zbus_channel *chan, void *user_data), void *user_data) +{ + STRUCT_SECTION_FOREACH(zbus_channel, chan) { + if (!(*iterator_func)(chan, user_data)) { + return false; + } + } + return true; +} + bool zbus_iterate_over_observers(bool (*iterator_func)(const struct zbus_observer *obs)) { STRUCT_SECTION_FOREACH(zbus_observer, obs) { @@ -26,3 +37,14 @@ bool zbus_iterate_over_observers(bool (*iterator_func)(const struct zbus_observe } return true; } + +bool zbus_iterate_over_observers_with_user_data( + bool (*iterator_func)(const struct zbus_observer *obs, void *user_data), void *user_data) +{ + STRUCT_SECTION_FOREACH(zbus_observer, obs) { + if (!(*iterator_func)(obs, user_data)) { + return false; + } + } + return true; +} diff --git a/subsys/zbus/zbus_runtime_observers.c b/subsys/zbus/zbus_runtime_observers.c index ea4649c3855..5fc47484dee 100644 --- a/subsys/zbus/zbus_runtime_observers.c +++ b/subsys/zbus/zbus_runtime_observers.c @@ -8,63 +8,59 @@ LOG_MODULE_DECLARE(zbus, CONFIG_ZBUS_LOG_LEVEL); -K_MEM_SLAB_DEFINE_STATIC(_zbus_runtime_obs_pool, sizeof(struct zbus_observer_node), - CONFIG_ZBUS_RUNTIME_OBSERVERS_POOL_SIZE, 4); - -struct k_mem_slab *zbus_runtime_obs_pool(void) -{ - return &_zbus_runtime_obs_pool; -} - int zbus_chan_add_obs(const struct zbus_channel *chan, const struct zbus_observer *obs, k_timeout_t timeout) { int err; struct zbus_observer_node *obs_nd, *tmp; - k_timepoint_t end_time = sys_timepoint_calc(timeout); + struct zbus_channel_observation *observation; _ZBUS_ASSERT(!k_is_in_isr(), "ISR blocked"); _ZBUS_ASSERT(chan != NULL, "chan is required"); _ZBUS_ASSERT(obs != NULL, "obs is required"); - /* Check if the observer is already a static observer */ - for (const struct zbus_observer *const *static_obs = chan->observers; *static_obs != NULL; - ++static_obs) { - if (*static_obs == obs) { - return -EEXIST; - } - } - - err = k_mutex_lock(chan->mutex, timeout); + err = k_mutex_lock(&chan->data->mutex, timeout); if (err) { return err; } + for (int16_t i = chan->data->observers_start_idx, limit = chan->data->observers_end_idx; + i < limit; ++i) { + STRUCT_SECTION_GET(zbus_channel_observation, i, &observation); + + __ASSERT(observation != NULL, "observation must be not NULL"); + + if (observation->obs == obs) { + k_mutex_unlock(&chan->data->mutex); + + return -EEXIST; + } + } + /* Check if the observer is already a runtime observer */ - SYS_SLIST_FOR_EACH_CONTAINER_SAFE(chan->runtime_observers, obs_nd, tmp, node) { + SYS_SLIST_FOR_EACH_CONTAINER_SAFE(&chan->data->observers, obs_nd, tmp, node) { if (obs_nd->obs == obs) { - k_mutex_unlock(chan->mutex); + k_mutex_unlock(&chan->data->mutex); return -EALREADY; } } - err = k_mem_slab_alloc(&_zbus_runtime_obs_pool, (void **)&obs_nd, - sys_timepoint_timeout(end_time)); + struct zbus_observer_node *new_obs_nd = k_malloc(sizeof(struct zbus_observer_node)); - if (err) { - LOG_ERR("Could not allocate memory on runtime observers pool\n"); + if (new_obs_nd == NULL) { + LOG_ERR("Could not allocate observer node the heap is full!"); - k_mutex_unlock(chan->mutex); + k_mutex_unlock(&chan->data->mutex); - return err; + return -ENOMEM; } - obs_nd->obs = obs; + new_obs_nd->obs = obs; - sys_slist_append(chan->runtime_observers, &obs_nd->node); + sys_slist_append(&chan->data->observers, &new_obs_nd->node); - k_mutex_unlock(chan->mutex); + k_mutex_unlock(&chan->data->mutex); return 0; } @@ -80,19 +76,18 @@ int zbus_chan_rm_obs(const struct zbus_channel *chan, const struct zbus_observer _ZBUS_ASSERT(chan != NULL, "chan is required"); _ZBUS_ASSERT(obs != NULL, "obs is required"); - err = k_mutex_lock(chan->mutex, timeout); + err = k_mutex_lock(&chan->data->mutex, timeout); if (err) { return err; } - SYS_SLIST_FOR_EACH_CONTAINER_SAFE(chan->runtime_observers, obs_nd, tmp, node) { + SYS_SLIST_FOR_EACH_CONTAINER_SAFE(&chan->data->observers, obs_nd, tmp, node) { if (obs_nd->obs == obs) { - sys_slist_remove(chan->runtime_observers, &prev_obs_nd->node, - &obs_nd->node); + sys_slist_remove(&chan->data->observers, &prev_obs_nd->node, &obs_nd->node); - k_mem_slab_free(&_zbus_runtime_obs_pool, (void **)&obs_nd); + k_free(obs_nd); - k_mutex_unlock(chan->mutex); + k_mutex_unlock(&chan->data->mutex); return 0; } @@ -100,7 +95,7 @@ int zbus_chan_rm_obs(const struct zbus_channel *chan, const struct zbus_observer prev_obs_nd = obs_nd; } - k_mutex_unlock(chan->mutex); + k_mutex_unlock(&chan->data->mutex); return -ENODATA; }