net: lwm2m: Timeseries data cache read update

Added support for handle case when all data is not possible to
add in 1 message for Send and Observed Notification.

Notification continuous pending timeseries data is triggred
by iMIN attribute.

Send Operation generate continuous message in multiple lwm2m
message.

Normal Read by server only report back latest stored data.

Signed-off-by: Juha Heiskanen <juha.heiskanen@nordicsemi.no>
This commit is contained in:
Juha Heiskanen 2022-10-20 15:16:55 +03:00 committed by Carles Cufí
commit 05a92f9258
4 changed files with 215 additions and 4 deletions

View file

@ -315,6 +315,9 @@ void lwm2m_reset_message(struct lwm2m_message *msg, bool release)
} else {
msg->message_timeout_cb = NULL;
(void)memset(&msg->cpkt, 0, sizeof(msg->cpkt));
#if defined(CONFIG_LWM2M_RESOURCE_DATA_CACHE_SUPPORT)
msg->cache_info = NULL;
#endif
}
}
@ -338,6 +341,9 @@ int lwm2m_init_message(struct lwm2m_message *msg)
}
lm2m_message_clear_allocations(msg);
#if defined(CONFIG_LWM2M_RESOURCE_DATA_CACHE_SUPPORT)
msg->cache_info = NULL;
#endif
r = coap_packet_init(&msg->cpkt, msg->msg_data, sizeof(msg->msg_data), COAP_VERSION_1,
msg->type, tokenlen, token, msg->code, msg->mid);
@ -1045,17 +1051,31 @@ static int lwm2m_read_resource_data(struct lwm2m_message *msg, void *data_ptr, s
return ret;
}
static int lwm2m_read_cached_data(struct lwm2m_message *msg,
struct lwm2m_time_series_resource *cached_data, uint8_t data_type)
{
#if defined(CONFIG_LWM2M_RESOURCE_DATA_CACHE_SUPPORT)
int ret;
struct lwm2m_time_series_elem buf;
struct lwm2m_cache_read_entry *read_info;
size_t length = lwm2m_cache_size(cached_data);
LOG_DBG("Read cached data size %u", length);
if (msg->cache_info) {
read_info = &msg->cache_info->read_info[msg->cache_info->entry_size];
/* Store original timeseries ring buffer get states for failure handling */
read_info->cache_data = cached_data;
read_info->original_get_base = cached_data->rb.get_base;
read_info->original_get_head = cached_data->rb.get_head;
read_info->original_get_tail = cached_data->rb.get_tail;
msg->cache_info->entry_size++;
if (msg->cache_info->entry_limit) {
length = MIN(length, msg->cache_info->entry_limit);
LOG_DBG("Limited number of read %d", length);
}
}
for (size_t i = 0; i < length; i++) {
if (!lwm2m_cache_read(cached_data, &buf)) {
@ -1125,6 +1145,18 @@ static int lwm2m_read_cached_data(struct lwm2m_message *msg,
#endif
}
static bool lwm2m_accept_timeseries_read(struct lwm2m_message *msg,
struct lwm2m_time_series_resource *cached_data)
{
#if defined(CONFIG_LWM2M_RESOURCE_DATA_CACHE_SUPPORT)
if (cached_data && msg->cache_info && lwm2m_cache_size(cached_data) &&
msg->out.writer->put_data_timestamp) {
return true;
}
#endif
return false;
}
static int lwm2m_read_handler(struct lwm2m_engine_obj_inst *obj_inst, struct lwm2m_engine_res *res,
struct lwm2m_engine_obj_field *obj_field, struct lwm2m_message *msg)
{
@ -1188,8 +1220,7 @@ static int lwm2m_read_handler(struct lwm2m_engine_obj_inst *obj_inst, struct lwm
cached_data = lwm2m_cache_entry_get_by_object(&temp_path);
if (cached_data && lwm2m_cache_size(cached_data) &&
msg->out.writer->put_data_timestamp) {
if (lwm2m_accept_timeseries_read(msg, cached_data)) {
/* Content Format Writer have to support timestamp write */
ret = lwm2m_read_cached_data(msg, cached_data, obj_field->data_type);
} else {
@ -2407,6 +2438,24 @@ static struct lwm2m_obj_path *lwm2m_read_first_path_ptr(sys_slist_t *lwm2m_path_
return &entry->path;
}
static void notify_cached_pending_data_trig(struct observe_node *obs)
{
#if defined(CONFIG_LWM2M_RESOURCE_DATA_CACHE_SUPPORT)
struct lwm2m_time_series_resource *cached_data;
struct lwm2m_obj_path_list *entry;
SYS_SLIST_FOR_EACH_CONTAINER(&obs->path_list, entry, node) {
cached_data = lwm2m_cache_entry_get_by_object(&entry->path);
if (!cached_data || lwm2m_cache_size(cached_data) == 0) {
continue;
}
/* Trig next send by iMin */
lwm2m_notify_observer_path(&entry->path);
}
#endif
}
static int notify_message_reply_cb(const struct coap_packet *response, struct coap_reply *reply,
const struct sockaddr *from)
{
@ -2446,6 +2495,7 @@ static int notify_message_reply_cb(const struct coap_packet *response, struct co
lwm2m_read_first_path_ptr(&obs->path_list),
reply->user_data);
}
notify_cached_pending_data_trig(obs);
}
}
@ -2472,18 +2522,68 @@ static int do_send_op(struct lwm2m_message *msg, uint16_t content_format,
}
}
static bool lwm2m_timeseries_data_rebuild(struct lwm2m_message *msg, int error_code)
{
#if defined(CONFIG_LWM2M_RESOURCE_DATA_CACHE_SUPPORT)
struct lwm2m_cache_read_info *cache_temp;
if (error_code != -ENOMEM) {
return false;
}
cache_temp = msg->cache_info;
if (!cache_temp || !cache_temp->entry_size) {
return false;
}
/* Put Ring buffer back to original */
for (int i = 0; i < cache_temp->entry_size; i++) {
cache_temp->read_info[i].cache_data->rb.get_head =
cache_temp->read_info[i].original_get_head;
cache_temp->read_info[i].cache_data->rb.get_tail =
cache_temp->read_info[i].original_get_tail;
cache_temp->read_info[i].cache_data->rb.get_base =
cache_temp->read_info[i].original_get_base;
}
if (cache_temp->entry_limit) {
/* Limited number of message build fail also */
return false;
}
/* Limit re-build entry count */
cache_temp->entry_limit = LWM2M_LIMITED_TIMESERIES_RESOURCE_COUNT / cache_temp->entry_size;
cache_temp->entry_size = 0;
lwm2m_reset_message(msg, false);
LOG_INF("Try re-buildbuild again with limited cache size %d", cache_temp->entry_limit);
return true;
#else
return false;
#endif
}
int generate_notify_message(struct lwm2m_ctx *ctx, struct observe_node *obs, void *user_data)
{
struct lwm2m_message *msg;
struct lwm2m_engine_obj_inst *obj_inst;
struct lwm2m_obj_path *path;
int ret = 0;
#if defined(CONFIG_LWM2M_RESOURCE_DATA_CACHE_SUPPORT)
struct lwm2m_cache_read_info cache_temp_info;
cache_temp_info.entry_size = 0;
cache_temp_info.entry_limit = 0;
#endif
msg = lwm2m_get_message(ctx);
if (!msg) {
LOG_ERR("Unable to get a lwm2m message!");
return -ENOMEM;
}
msg_init:
if (!obs->composite) {
path = lwm2m_read_first_path_ptr(&obs->path_list);
if (!path) {
@ -2527,6 +2627,9 @@ int generate_notify_message(struct lwm2m_ctx *ctx, struct observe_node *obs, voi
LOG_ERR("Unable to init lwm2m message! (err: %d)", ret);
goto cleanup;
}
#if defined(CONFIG_LWM2M_RESOURCE_DATA_CACHE_SUPPORT)
msg->cache_info = &cache_temp_info;
#endif
/* lwm2m_init_message() cleans the coap reply fields, so we assign our data here */
msg->reply->user_data = user_data;
@ -2549,6 +2652,12 @@ int generate_notify_message(struct lwm2m_ctx *ctx, struct observe_node *obs, voi
}
if (ret < 0) {
if (lwm2m_timeseries_data_rebuild(msg, ret)) {
/* Message Build fail by ENOMEM and data include timeseries data.
* Try rebuild message again by limiting timeseries data entry lenghts.
*/
goto msg_init;
}
LOG_ERR("error in multi-format read (err:%d)", ret);
goto cleanup;
}
@ -2556,6 +2665,9 @@ int generate_notify_message(struct lwm2m_ctx *ctx, struct observe_node *obs, voi
obs->active_tx_operation = true;
obs->resource_update = false;
lwm2m_information_interface_send(msg);
#if defined(CONFIG_LWM2M_RESOURCE_DATA_CACHE_SUPPORT)
msg->cache_info = NULL;
#endif
LOG_DBG("NOTIFY MSG: SENT");
return 0;
@ -2830,6 +2942,45 @@ static void do_send_timeout_cb(struct lwm2m_message *msg)
}
#endif
#if defined(CONFIG_LWM2M_RESOURCE_DATA_CACHE_SUPPORT)
static bool init_next_pending_timeseries_data(struct lwm2m_cache_read_info *cache_temp,
sys_slist_t *lwm2m_path_list,
sys_slist_t *lwm2m_path_free_list)
{
struct lwm2m_obj_path temp;
uint32_t bytes_available = 0;
int ret;
/* Check do we have still pending data to send */
for (int i = 0; i < cache_temp->entry_size; i++) {
if (ring_buf_is_empty(&cache_temp->read_info[i].cache_data->rb)) {
/* Skip Emtpy cached buffers */
continue;
}
ret = lwm2m_string_to_path(cache_temp->read_info[i].cache_data->path, &temp, '/');
if (ret < 0) {
return false;
}
/* Add to linked list */
if (lwm2m_engine_add_path_to_list(lwm2m_path_list, lwm2m_path_free_list, &temp)) {
return false;
}
bytes_available += ring_buf_size_get(&cache_temp->read_info[i].cache_data->rb);
}
if (bytes_available == 0) {
return false;
}
LOG_INF("Allocate a new message for pending data %u", bytes_available);
cache_temp->entry_size = 0;
cache_temp->entry_limit = 0;
return true;
}
#endif
int lwm2m_engine_send(struct lwm2m_ctx *ctx, char const *path_list[], uint8_t path_list_size,
bool confirmation_request)
{
@ -2843,6 +2994,12 @@ int lwm2m_engine_send(struct lwm2m_ctx *ctx, char const *path_list[], uint8_t pa
struct lwm2m_obj_path_list lwm2m_path_list_buf[CONFIG_LWM2M_COMPOSITE_PATH_LIST_SIZE];
sys_slist_t lwm2m_path_list;
sys_slist_t lwm2m_path_free_list;
#if defined(CONFIG_LWM2M_RESOURCE_DATA_CACHE_SUPPORT)
struct lwm2m_cache_read_info cache_temp_info;
cache_temp_info.entry_size = 0;
cache_temp_info.entry_limit = 0;
#endif
/* Validate Connection */
if (!lwm2m_rd_client_is_registred(ctx)) {
@ -2884,7 +3041,9 @@ int lwm2m_engine_send(struct lwm2m_ctx *ctx, char const *path_list[], uint8_t pa
}
/* Clear path which are part are part of recursive path /1 will include /1/0/1 */
lwm2m_engine_clear_duplicate_path(&lwm2m_path_list, &lwm2m_path_free_list);
#if defined(CONFIG_LWM2M_RESOURCE_DATA_CACHE_SUPPORT)
msg_alloc:
#endif
/* Allocate Message buffer */
msg = lwm2m_get_message(ctx);
if (!msg) {
@ -2892,6 +3051,8 @@ int lwm2m_engine_send(struct lwm2m_ctx *ctx, char const *path_list[], uint8_t pa
return -ENOMEM;
}
msg_init:
if (confirmation_request) {
msg->type = COAP_TYPE_CON;
msg->reply_cb = do_send_reply_cb;
@ -2910,6 +3071,10 @@ int lwm2m_engine_send(struct lwm2m_ctx *ctx, char const *path_list[], uint8_t pa
if (ret) {
goto cleanup;
}
#if defined(CONFIG_LWM2M_RESOURCE_DATA_CACHE_SUPPORT)
msg->cache_info = &cache_temp_info;
#endif
ret = select_writer(&msg->out, content_format);
if (ret) {
goto cleanup;
@ -2926,12 +3091,36 @@ int lwm2m_engine_send(struct lwm2m_ctx *ctx, char const *path_list[], uint8_t pa
ret = do_send_op(msg, content_format, &lwm2m_path_list);
lwm2m_registry_unlock();
if (ret < 0) {
if (lwm2m_timeseries_data_rebuild(msg, ret)) {
/* Message Build fail by ENOMEM and data include timeseries data.
* Try rebuild message again by limiting timeseries data entry lenghts.
*/
goto msg_init;
}
LOG_ERR("Send (err:%d)", ret);
goto cleanup;
}
#if defined(CONFIG_LWM2M_RESOURCE_DATA_CACHE_SUPPORT)
msg->cache_info = NULL;
#endif
LOG_INF("Send op to server (/dp)");
lwm2m_information_interface_send(msg);
#if defined(CONFIG_LWM2M_RESOURCE_DATA_CACHE_SUPPORT)
if (cache_temp_info.entry_size) {
/* Init Path list for continuous message allocation */
lwm2m_engine_path_list_init(&lwm2m_path_list, &lwm2m_path_free_list,
lwm2m_path_list_buf,
CONFIG_LWM2M_COMPOSITE_PATH_LIST_SIZE);
if (init_next_pending_timeseries_data(&cache_temp_info, &lwm2m_path_list,
&lwm2m_path_free_list)) {
goto msg_alloc;
}
}
#endif
return 0;
cleanup:
lwm2m_reset_message(msg, true);