lwm2m: keep track of observations per client
this has a number of advantages: - allows to only create notifications for each client if there are no messages already waiting to be send, in practice prioritizing the memory for messages for answers, thus staying more "responsive". - saves a fair bit of memory by eliminationg now redundant client_ctx pointer per observer. - fixes a potential subtle bug: previously, an observer reset would've stopped the first observation found with a matching token, which might've belonged to a differen client. Signed-off-by: Henning Fleddermann <henning.fleddermann@grandcentrix.net>
This commit is contained in:
parent
32989a38f0
commit
05b2018ffa
2 changed files with 65 additions and 76 deletions
|
@ -72,7 +72,6 @@ LOG_MODULE_REGISTER(LOG_MODULE_NAME);
|
|||
|
||||
struct observe_node {
|
||||
sys_snode_t node;
|
||||
struct lwm2m_ctx *ctx;
|
||||
struct lwm2m_obj_path path;
|
||||
uint8_t token[MAX_TOKEN_LEN];
|
||||
int64_t event_timestamp;
|
||||
|
@ -109,7 +108,6 @@ static struct service_node service_node_data[MAX_PERIODIC_SERVICE];
|
|||
|
||||
static sys_slist_t engine_obj_list;
|
||||
static sys_slist_t engine_obj_inst_list;
|
||||
static sys_slist_t engine_observer_list;
|
||||
static sys_slist_t engine_service_list;
|
||||
|
||||
static K_KERNEL_STACK_DEFINE(engine_thread_stack,
|
||||
|
@ -348,20 +346,23 @@ int lwm2m_notify_observer(uint16_t obj_id, uint16_t obj_inst_id, uint16_t res_id
|
|||
{
|
||||
struct observe_node *obs;
|
||||
int ret = 0;
|
||||
int i;
|
||||
|
||||
/* look for observers which match our resource */
|
||||
SYS_SLIST_FOR_EACH_CONTAINER(&engine_observer_list, obs, node) {
|
||||
if (obs->path.obj_id == obj_id &&
|
||||
obs->path.obj_inst_id == obj_inst_id &&
|
||||
(obs->path.level < 3 ||
|
||||
obs->path.res_id == res_id)) {
|
||||
/* update the event time for this observer */
|
||||
obs->event_timestamp = k_uptime_get();
|
||||
for (i = 0; i < sock_nfds; ++i) {
|
||||
SYS_SLIST_FOR_EACH_CONTAINER(&sock_ctx[i]->observer, obs, node) {
|
||||
if (obs->path.obj_id == obj_id &&
|
||||
obs->path.obj_inst_id == obj_inst_id &&
|
||||
(obs->path.level < 3 ||
|
||||
obs->path.res_id == res_id)) {
|
||||
/* update the event time for this observer */
|
||||
obs->event_timestamp = k_uptime_get();
|
||||
|
||||
LOG_DBG("NOTIFY EVENT %u/%u/%u",
|
||||
obj_id, obj_inst_id, res_id);
|
||||
LOG_DBG("NOTIFY EVENT %u/%u/%u",
|
||||
obj_id, obj_inst_id, res_id);
|
||||
|
||||
ret++;
|
||||
ret++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -405,10 +406,9 @@ static int engine_add_observer(struct lwm2m_message *msg,
|
|||
/* TODO: observe dup checking */
|
||||
|
||||
/* make sure this observer doesn't exist already */
|
||||
SYS_SLIST_FOR_EACH_CONTAINER(&engine_observer_list, obs, node) {
|
||||
SYS_SLIST_FOR_EACH_CONTAINER(&msg->ctx->observer, obs, node) {
|
||||
/* TODO: distinguish server object */
|
||||
if (obs->ctx == msg->ctx &&
|
||||
memcmp(&obs->path, &msg->path, sizeof(msg->path)) == 0) {
|
||||
if (memcmp(&obs->path, &msg->path, sizeof(msg->path)) == 0) {
|
||||
/* quietly update the token information */
|
||||
memcpy(obs->token, token, tkl);
|
||||
obs->tkl = tkl;
|
||||
|
@ -489,7 +489,7 @@ static int engine_add_observer(struct lwm2m_message *msg,
|
|||
|
||||
/* find an unused observer index node */
|
||||
for (i = 0; i < CONFIG_LWM2M_ENGINE_MAX_OBSERVER; i++) {
|
||||
if (!observe_node_data[i].ctx) {
|
||||
if (!observe_node_data[i].tkl) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -500,7 +500,6 @@ static int engine_add_observer(struct lwm2m_message *msg,
|
|||
}
|
||||
|
||||
/* copy the values and add it to the list */
|
||||
observe_node_data[i].ctx = msg->ctx;
|
||||
memcpy(&observe_node_data[i].path, &msg->path, sizeof(msg->path));
|
||||
memcpy(observe_node_data[i].token, token, tkl);
|
||||
observe_node_data[i].tkl = tkl;
|
||||
|
@ -512,7 +511,7 @@ static int engine_add_observer(struct lwm2m_message *msg,
|
|||
: attrs.pmax;
|
||||
observe_node_data[i].format = format;
|
||||
observe_node_data[i].counter = OBSERVE_COUNTER_START;
|
||||
sys_slist_append(&engine_observer_list,
|
||||
sys_slist_append(&msg->ctx->observer,
|
||||
&observe_node_data[i].node);
|
||||
|
||||
LOG_DBG("OBSERVER ADDED %u/%u/%u(%u) token:'%s' addr:%s",
|
||||
|
@ -524,7 +523,7 @@ static int engine_add_observer(struct lwm2m_message *msg,
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int engine_remove_observer(const uint8_t *token, uint8_t tkl)
|
||||
static int engine_remove_observer(struct lwm2m_ctx *ctx, const uint8_t *token, uint8_t tkl)
|
||||
{
|
||||
struct observe_node *obs, *found_obj = NULL;
|
||||
sys_snode_t *prev_node = NULL;
|
||||
|
@ -536,7 +535,7 @@ static int engine_remove_observer(const uint8_t *token, uint8_t tkl)
|
|||
}
|
||||
|
||||
/* find the node index */
|
||||
SYS_SLIST_FOR_EACH_CONTAINER(&engine_observer_list, obs, node) {
|
||||
SYS_SLIST_FOR_EACH_CONTAINER(&ctx->observer, obs, node) {
|
||||
if (memcmp(obs->token, token, tkl) == 0) {
|
||||
found_obj = obs;
|
||||
break;
|
||||
|
@ -549,7 +548,7 @@ static int engine_remove_observer(const uint8_t *token, uint8_t tkl)
|
|||
return -ENOENT;
|
||||
}
|
||||
|
||||
sys_slist_remove(&engine_observer_list, prev_node, &found_obj->node);
|
||||
sys_slist_remove(&ctx->observer, prev_node, &found_obj->node);
|
||||
(void)memset(found_obj, 0, sizeof(*found_obj));
|
||||
|
||||
LOG_DBG("observer '%s' removed", log_strdup(sprint_token(token, tkl)));
|
||||
|
@ -610,18 +609,21 @@ static void engine_remove_observer_by_id(uint16_t obj_id, int32_t obj_inst_id)
|
|||
{
|
||||
struct observe_node *obs, *tmp;
|
||||
sys_snode_t *prev_node = NULL;
|
||||
int i;
|
||||
|
||||
/* remove observer instances accordingly */
|
||||
SYS_SLIST_FOR_EACH_CONTAINER_SAFE(
|
||||
&engine_observer_list, obs, tmp, node) {
|
||||
if (!(obj_id == obs->path.obj_id &&
|
||||
obj_inst_id == obs->path.obj_inst_id)) {
|
||||
prev_node = &obs->node;
|
||||
continue;
|
||||
}
|
||||
for (i = 0; i < sock_nfds; ++i) {
|
||||
SYS_SLIST_FOR_EACH_CONTAINER_SAFE(
|
||||
&sock_ctx[i]->observer, obs, tmp, node) {
|
||||
if (!(obj_id == obs->path.obj_id &&
|
||||
obj_inst_id == obs->path.obj_inst_id)) {
|
||||
prev_node = &obs->node;
|
||||
continue;
|
||||
}
|
||||
|
||||
sys_slist_remove(&engine_observer_list, prev_node, &obs->node);
|
||||
(void)memset(obs, 0, sizeof(*obs));
|
||||
sys_slist_remove(&sock_ctx[i]->observer, prev_node, &obs->node);
|
||||
(void)memset(obs, 0, sizeof(*obs));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -3015,7 +3017,7 @@ static int lwm2m_write_attr_handler(struct lwm2m_engine_obj *obj,
|
|||
}
|
||||
|
||||
/* update observe_node accordingly */
|
||||
SYS_SLIST_FOR_EACH_CONTAINER(&engine_observer_list, obs, node) {
|
||||
SYS_SLIST_FOR_EACH_CONTAINER(&msg->ctx->observer, obs, node) {
|
||||
/* updated path is deeper than obs node, skip */
|
||||
if (msg->path.level > obs->path.level) {
|
||||
continue;
|
||||
|
@ -3930,7 +3932,7 @@ static int handle_request(struct coap_packet *request,
|
|||
}
|
||||
} else if (observe == 1) {
|
||||
/* remove observer */
|
||||
r = engine_remove_observer(token, tkl);
|
||||
r = engine_remove_observer(msg->ctx, token, tkl);
|
||||
if (r < 0) {
|
||||
#if defined(CONFIG_LWM2M_CANCEL_OBSERVE_BY_PATH)
|
||||
r = engine_remove_observer_by_path(&msg->path);
|
||||
|
@ -4250,27 +4252,13 @@ static int32_t retransmit_request(struct lwm2m_ctx *client_ctx,
|
|||
return next_retransmission;
|
||||
}
|
||||
|
||||
static int32_t check_retransmissions(const uint32_t timestamp)
|
||||
{
|
||||
int32_t next_retransmit;
|
||||
int32_t next_retransmit_min = INT32_MAX;
|
||||
|
||||
for (int i = 0; i < sock_nfds; ++i) {
|
||||
next_retransmit = retransmit_request(sock_ctx[i], timestamp);
|
||||
if (next_retransmit < next_retransmit_min) {
|
||||
next_retransmit_min = next_retransmit;
|
||||
}
|
||||
}
|
||||
|
||||
return next_retransmit_min;
|
||||
}
|
||||
|
||||
static int notify_message_reply_cb(const struct coap_packet *response,
|
||||
struct coap_reply *reply,
|
||||
const struct sockaddr *from)
|
||||
{
|
||||
int ret = 0;
|
||||
uint8_t type, code;
|
||||
struct lwm2m_message *msg;
|
||||
|
||||
type = coap_header_get_type(response);
|
||||
code = coap_header_get_code(response);
|
||||
|
@ -4284,7 +4272,8 @@ static int notify_message_reply_cb(const struct coap_packet *response,
|
|||
/* remove observer on COAP_TYPE_RESET */
|
||||
if (type == COAP_TYPE_RESET) {
|
||||
if (reply->tkl > 0) {
|
||||
ret = engine_remove_observer(reply->token, reply->tkl);
|
||||
msg = find_msg(NULL, reply);
|
||||
ret = engine_remove_observer(msg->ctx, reply->token, reply->tkl);
|
||||
if (ret) {
|
||||
LOG_ERR("remove observe error: %d", ret);
|
||||
}
|
||||
|
@ -4296,19 +4285,15 @@ static int notify_message_reply_cb(const struct coap_packet *response,
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int generate_notify_message(struct observe_node *obs,
|
||||
static int generate_notify_message(struct lwm2m_ctx *ctx,
|
||||
struct observe_node *obs,
|
||||
bool manual_trigger)
|
||||
{
|
||||
struct lwm2m_message *msg;
|
||||
struct lwm2m_engine_obj_inst *obj_inst;
|
||||
int ret = 0;
|
||||
|
||||
if (!obs->ctx) {
|
||||
LOG_ERR("observer has no valid LwM2M ctx!");
|
||||
return -EINVAL;
|
||||
}
|
||||
|
||||
msg = lwm2m_get_message(obs->ctx);
|
||||
msg = lwm2m_get_message(ctx);
|
||||
if (!msg) {
|
||||
LOG_ERR("Unable to get a lwm2m message!");
|
||||
return -ENOMEM;
|
||||
|
@ -4325,7 +4310,7 @@ static int generate_notify_message(struct observe_node *obs,
|
|||
obs->path.res_id,
|
||||
obs->path.level,
|
||||
log_strdup(sprint_token(obs->token, obs->tkl)),
|
||||
log_strdup(lwm2m_sprint_ip_addr(&obs->ctx->remote_addr)),
|
||||
log_strdup(lwm2m_sprint_ip_addr(&ctx->remote_addr)),
|
||||
k_uptime_get());
|
||||
|
||||
obj_inst = get_engine_obj_inst(obs->path.obj_id,
|
||||
|
@ -4466,22 +4451,17 @@ static int32_t lwm2m_engine_service(const int64_t timestamp)
|
|||
|
||||
int lwm2m_engine_context_close(struct lwm2m_ctx *client_ctx)
|
||||
{
|
||||
struct observe_node *obs, *tmp;
|
||||
sys_snode_t *prev_node = NULL;
|
||||
int sock_fd = client_ctx->sock_fd;
|
||||
struct lwm2m_message *msg;
|
||||
sys_snode_t *obs_node;
|
||||
struct observe_node *obs;
|
||||
size_t i;
|
||||
|
||||
/* Remove observes for this context */
|
||||
SYS_SLIST_FOR_EACH_CONTAINER_SAFE(&engine_observer_list, obs, tmp,
|
||||
node) {
|
||||
if (obs->ctx == client_ctx) {
|
||||
sys_slist_remove(&engine_observer_list, prev_node,
|
||||
&obs->node);
|
||||
(void)memset(obs, 0, sizeof(*obs));
|
||||
} else {
|
||||
prev_node = &obs->node;
|
||||
}
|
||||
while (!sys_slist_is_empty(&client_ctx->observer)) {
|
||||
obs_node = sys_slist_get_not_empty(&client_ctx->observer);
|
||||
obs = SYS_SLIST_CONTAINER(obs_node, obs, node);
|
||||
(void)memset(obs, 0, sizeof(*obs));
|
||||
}
|
||||
|
||||
for (i = 0, msg = messages; i < ARRAY_SIZE(messages); i++, msg++) {
|
||||
|
@ -4507,6 +4487,7 @@ int lwm2m_engine_context_close(struct lwm2m_ctx *client_ctx)
|
|||
void lwm2m_engine_context_init(struct lwm2m_ctx *client_ctx)
|
||||
{
|
||||
sys_slist_init(&client_ctx->pending_sends);
|
||||
sys_slist_init(&client_ctx->observer);
|
||||
}
|
||||
|
||||
/* LwM2M Socket Integration */
|
||||
|
@ -4564,23 +4545,23 @@ static bool automatic_notify_is_due(const struct observe_node *obs,
|
|||
const bool has_max_period = obs->max_period_sec != 0;
|
||||
|
||||
return has_max_period && (timestamp > obs->last_timestamp +
|
||||
MSEC_PER_SEC * obs->max_period_sec);
|
||||
MSEC_PER_SEC * obs->max_period_sec);
|
||||
}
|
||||
|
||||
static void check_notifications(sys_slist_t *observer_list,
|
||||
const int64_t timestamp)
|
||||
static void check_notifications(struct lwm2m_ctx *ctx,
|
||||
const int64_t timestamp)
|
||||
{
|
||||
struct observe_node *obs;
|
||||
int rc;
|
||||
bool manual_notify, automatic_notify;
|
||||
|
||||
SYS_SLIST_FOR_EACH_CONTAINER(observer_list, obs, node) {
|
||||
SYS_SLIST_FOR_EACH_CONTAINER(&ctx->observer, obs, node) {
|
||||
manual_notify = manual_notify_is_due(obs, timestamp);
|
||||
automatic_notify = automatic_notify_is_due(obs, timestamp);
|
||||
if (!manual_notify && !automatic_notify) {
|
||||
continue;
|
||||
}
|
||||
rc = generate_notify_message(obs, manual_notify);
|
||||
rc = generate_notify_message(ctx, obs, manual_notify);
|
||||
if (rc == -ENOMEM) {
|
||||
/* no memory/messages available, retry later */
|
||||
return;
|
||||
|
@ -4665,11 +4646,18 @@ static void socket_loop(void)
|
|||
continue;
|
||||
}
|
||||
|
||||
next_retransmit = check_retransmissions(timestamp);
|
||||
if (next_retransmit < timeout) {
|
||||
timeout = next_retransmit;
|
||||
for (i = 0; i < sock_nfds; ++i) {
|
||||
if (sys_slist_is_empty(&sock_ctx[i]->pending_sends)) {
|
||||
next_retransmit = retransmit_request(sock_ctx[i], timestamp);
|
||||
if (next_retransmit < timeout) {
|
||||
timeout = next_retransmit;
|
||||
}
|
||||
}
|
||||
if (sys_slist_is_empty(&sock_ctx[i]->pending_sends)) {
|
||||
check_notifications(sock_ctx[i], timestamp);
|
||||
}
|
||||
}
|
||||
check_notifications(&engine_observer_list, timestamp);
|
||||
|
||||
socket_reset_pollfd_events();
|
||||
|
||||
/*
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue