diff --git a/include/net/lwm2m.h b/include/net/lwm2m.h index 3c0f01f793e..2e346a2707e 100644 --- a/include/net/lwm2m.h +++ b/include/net/lwm2m.h @@ -77,8 +77,7 @@ struct lwm2m_ctx { /** Private CoAP and networking structures */ struct coap_pending pendings[CONFIG_LWM2M_ENGINE_MAX_PENDING]; struct coap_reply replies[CONFIG_LWM2M_ENGINE_MAX_REPLIES]; - struct k_work_delayable retransmit_work; - struct sys_mutex send_lock; + sys_slist_t pending_sends; /** A pointer to currently processed request, for internal LwM2M engine * use. The underlying type is ``struct lwm2m_message``, but since it's diff --git a/subsys/net/lib/lwm2m/lwm2m_engine.c b/subsys/net/lib/lwm2m/lwm2m_engine.c index efe0672500d..88371af305c 100644 --- a/subsys/net/lib/lwm2m/lwm2m_engine.c +++ b/subsys/net/lib/lwm2m/lwm2m_engine.c @@ -18,6 +18,7 @@ #include LOG_MODULE_REGISTER(LOG_MODULE_NAME); +#include #include #include #include @@ -26,7 +27,6 @@ LOG_MODULE_REGISTER(LOG_MODULE_NAME); #include #include #include -#include #include #include #include @@ -1024,9 +1024,15 @@ cleanup: return r; } -int lwm2m_send_message(struct lwm2m_message *msg) +int lwm2m_send_message_async(struct lwm2m_message *msg) { - int rc, ret; + sys_slist_append(&msg->ctx->pending_sends, &msg->node); + return 0; +} + +static int lwm2m_send_message(struct lwm2m_message *msg) +{ + int rc; if (!msg || !msg->ctx) { LOG_ERR("LwM2M message is invalid."); @@ -1037,37 +1043,23 @@ int lwm2m_send_message(struct lwm2m_message *msg) coap_pending_cycle(msg->pending); } - msg->send_attempts++; - - ret = sys_mutex_lock(&msg->ctx->send_lock, K_FOREVER); - __ASSERT(ret == 0, "sys_mutex_lock failed with %d", ret); rc = send(msg->ctx->sock_fd, msg->cpkt.data, msg->cpkt.offset, 0); - ret = sys_mutex_unlock(&msg->ctx->send_lock); - __ASSERT(ret == 0, "sys_mutex_unlock failed with %d", ret); - ARG_UNUSED(ret); + if (rc < 0) { - if (msg->type == COAP_TYPE_CON) { - coap_pending_clear(msg->pending); + if (errno == EAGAIN || errno == EWOULDBLOCK) { + LOG_WRN("Failed to send packet, would block"); + return -errno; } LOG_ERR("Failed to send packet, err %d", errno); + if (msg->type != COAP_TYPE_CON) { + lwm2m_reset_message(msg, true); + } return -errno; } - if (msg->type == COAP_TYPE_CON) { - int32_t remaining = - k_ticks_to_ms_ceil32(k_work_delayable_remaining_get( - &msg->ctx->retransmit_work)); - - /* If the item is already pending and its timeout is smaller - * than the new one, skip the submission. - */ - if (remaining == 0 || remaining > msg->pending->timeout) { - k_work_reschedule(&msg->ctx->retransmit_work, - K_MSEC(msg->pending->timeout)); - } - } else { + if (msg->type != COAP_TYPE_CON) { lwm2m_reset_message(msg, true); } @@ -1099,11 +1091,7 @@ int lwm2m_send_empty_ack(struct lwm2m_ctx *client_ctx, uint16_t mid) goto cleanup; } - ret = lwm2m_send_message(msg); - if (ret < 0) { - LOG_ERR("Error sending LWM2M packet (err:%d).", ret); - goto cleanup; - } + lwm2m_send_message_async(msg); return 0; @@ -4204,102 +4192,77 @@ static void lwm2m_udp_receive(struct lwm2m_ctx *client_ctx, } client_ctx->processed_req = NULL; - - r = lwm2m_send_message(msg); - if (r < 0) { - LOG_ERR("Err sending response: %d", r); - lwm2m_reset_message(msg, true); - } + lwm2m_send_message_async(msg); } else { LOG_DBG("No handler for response"); } } -static void retransmit_request(struct k_work *work) +/* returns ms until the next retransmission is due, or INT32_MAX + * if no retransmissions are necessary + */ +static int32_t retransmit_request(struct lwm2m_ctx *client_ctx, + const uint32_t timestamp) { - struct lwm2m_ctx *client_ctx; struct lwm2m_message *msg; - struct coap_pending *pending; - int32_t remaining; - int ret; + struct coap_pending *p; + int32_t remaining, next_retransmission = INT32_MAX; + int i; - client_ctx = CONTAINER_OF(work, struct lwm2m_ctx, retransmit_work); - pending = coap_pending_next_to_expire(client_ctx->pendings, - CONFIG_LWM2M_ENGINE_MAX_PENDING); - if (!pending) { - return; - } - - remaining = pending->t0 + pending->timeout - k_uptime_get_32(); - if (remaining > 0) { - /* First message to expire was removed from the list, - * schedule next. - */ - goto next; - } - - msg = find_msg(pending, NULL); - if (!msg) { - LOG_ERR("pending has no valid LwM2M message!"); - coap_pending_clear(pending); - goto next; - } - - if (!coap_pending_cycle(pending)) { - /* pending request has expired */ - if (msg->message_timeout_cb) { - msg->message_timeout_cb(msg); + for (i = 0, p = client_ctx->pendings; + i < CONFIG_LWM2M_ENGINE_MAX_PENDING; i++, p++) { + if (!p->timeout) { + continue; } - /* - * coap_pending_clear() is called in lwm2m_reset_message() - * which balances the ref we made in coap_pending_cycle() - */ - lwm2m_reset_message(msg, true); - goto next; + remaining = p->t0 + p->timeout - timestamp; + if (remaining < 0) { + msg = find_msg(p, NULL); + if (!msg) { + LOG_ERR("pending has no valid LwM2M message!"); + coap_pending_clear(p); + continue; + } + if (!p->retries) { + /* pending request has expired */ + if (msg->message_timeout_cb) { + msg->message_timeout_cb(msg); + } + lwm2m_reset_message(msg, true); + continue; + } + if (msg->acknowledged) { + /* No need to retransmit, just keep the timer running to + * timeout in case no response arrives. + */ + coap_pending_cycle(p); + continue; + } + + lwm2m_send_message_async(msg); + break; + } + if (remaining < next_retransmission) { + next_retransmission = remaining; + } } - if (msg->acknowledged) { - /* No need to retransmit, just keep the timer running to - * timeout in case no response arrives. - */ - goto next; + return next_retransmission; +} + +static int32_t check_retransmissions(const uint32_t timestamp) +{ + int32_t next_retransmit; + int32_t next_retransmit_min = INT32_MAX; + + for (int i = 0; i < sock_nfds; ++i) { + next_retransmit = retransmit_request(sock_ctx[i], timestamp); + if (next_retransmit < next_retransmit_min) { + next_retransmit_min = next_retransmit; + } } - LOG_INF("Resending message: %p", msg); - msg->send_attempts++; - - ret = sys_mutex_lock(&client_ctx->send_lock, K_FOREVER); - __ASSERT(ret == 0, "sys_mutex_lock failed with %d", ret); - - if (msg->ctx == NULL) { - LOG_INF("Response for %p already handled", msg); - goto next_locked; - } - - if (send(msg->ctx->sock_fd, msg->cpkt.data, msg->cpkt.offset, 0) < 0) { - LOG_ERR("Error sending lwm2m message: %d", -errno); - /* don't error here, retry until timeout */ - } - -next_locked: - ret = sys_mutex_unlock(&client_ctx->send_lock); - __ASSERT(ret == 0, "sys_mutex_unlock failed with %d", ret); - ARG_UNUSED(ret); - -next: - pending = coap_pending_next_to_expire(client_ctx->pendings, - CONFIG_LWM2M_ENGINE_MAX_PENDING); - if (!pending) { - return; - } - - remaining = pending->t0 + pending->timeout - k_uptime_get_32(); - if (remaining < 0) { - remaining = 0; - } - - k_work_reschedule(&client_ctx->retransmit_work, K_MSEC(remaining)); + return next_retransmit_min; } static int notify_message_reply_cb(const struct coap_packet *response, @@ -4407,11 +4370,7 @@ static int generate_notify_message(struct observe_node *obs, goto cleanup; } - ret = lwm2m_send_message(msg); - if (ret < 0) { - LOG_ERR("Error sending LWM2M packet (err:%d).", ret); - goto cleanup; - } + lwm2m_send_message_async(msg); LOG_DBG("NOTIFY MSG: SENT"); return 0; @@ -4421,10 +4380,11 @@ cleanup: return ret; } -int32_t engine_next_service_timeout_ms(uint32_t max_timeout) +static int32_t engine_next_service_timeout_ms(uint32_t max_timeout, + const int64_t timestamp) { struct service_node *srv; - uint64_t time_left_ms, timestamp = k_uptime_get(); + uint64_t time_left_ms; uint32_t timeout = max_timeout; SYS_SLIST_FOR_EACH_CONTAINER(&engine_service_list, srv, node) { @@ -4484,48 +4444,11 @@ int lwm2m_engine_update_service_period(k_work_handler_t service, uint32_t period return -ENOENT; } -static int lwm2m_engine_service(void) +static int32_t lwm2m_engine_service(const int64_t timestamp) { - struct observe_node *obs; struct service_node *srv; - int64_t timestamp, service_due_timestamp; + int64_t service_due_timestamp; - /* - * 1. scan the observer list - * 2. For each notify event found, scan the observer list - * 3. For each observer match, generate a NOTIFY message, - * attaching the notify response handler - */ - timestamp = k_uptime_get(); - SYS_SLIST_FOR_EACH_CONTAINER(&engine_observer_list, obs, node) { - /* - * manual notify requirements: - * - event_timestamp > last_timestamp - * - if min_period_sec is set: - * current timestamp > last_timestamp + min_period_sec - */ - if (obs->event_timestamp > obs->last_timestamp && - (obs->min_period_sec == 0 || - timestamp > obs->last_timestamp + - MSEC_PER_SEC * obs->min_period_sec)) { - obs->last_timestamp = k_uptime_get(); - generate_notify_message(obs, true); - - /* - * automatic time-based notify requirements: - * - if max_period_sec is set: - * current timestamp > last_timestamp + max_period_sec - */ - } else if (obs->max_period_sec > 0 && - timestamp > obs->last_timestamp + - MSEC_PER_SEC * obs->max_period_sec) { - obs->last_timestamp = k_uptime_get(); - generate_notify_message(obs, false); - } - - } - - timestamp = k_uptime_get(); SYS_SLIST_FOR_EACH_CONTAINER(&engine_service_list, srv, node) { service_due_timestamp = srv->last_timestamp + srv->min_call_period; @@ -4537,7 +4460,8 @@ static int lwm2m_engine_service(void) } /* calculate how long to sleep till the next service */ - return engine_next_service_timeout_ms(ENGINE_UPDATE_INTERVAL_MS); + return engine_next_service_timeout_ms(ENGINE_UPDATE_INTERVAL_MS, + timestamp); } int lwm2m_engine_context_close(struct lwm2m_ctx *client_ctx) @@ -4548,12 +4472,9 @@ int lwm2m_engine_context_close(struct lwm2m_ctx *client_ctx) struct lwm2m_message *msg; size_t i; - /* Cancel pending retransmit work */ - k_work_cancel_delayable(&client_ctx->retransmit_work); - /* Remove observes for this context */ - SYS_SLIST_FOR_EACH_CONTAINER_SAFE(&engine_observer_list, - obs, tmp, node) { + SYS_SLIST_FOR_EACH_CONTAINER_SAFE(&engine_observer_list, obs, tmp, + node) { if (obs->ctx == client_ctx) { sys_slist_remove(&engine_observer_list, prev_node, &obs->node); @@ -4585,8 +4506,7 @@ int lwm2m_engine_context_close(struct lwm2m_ctx *client_ctx) void lwm2m_engine_context_init(struct lwm2m_ctx *client_ctx) { - k_work_init_delayable(&client_ctx->retransmit_work, retransmit_request); - sys_mutex_init(&client_ctx->send_lock); + sys_slist_init(&client_ctx->pending_sends); } /* LwM2M Socket Integration */ @@ -4628,28 +4548,136 @@ void lwm2m_socket_del(struct lwm2m_ctx *ctx) } } -/* LwM2M main work loop */ - -static void socket_receive_loop(void) +static bool manual_notify_is_due(const struct observe_node *obs, + const int64_t timestamp) { - static uint8_t in_buf[NET_IPV6_MTU]; - static struct sockaddr from_addr; - socklen_t from_addr_len; - ssize_t len; - int i; + const bool has_min_period = obs->min_period_sec != 0; - while (1) { - /* wait for sockets */ - if (sock_nfds < 1) { - k_msleep(lwm2m_engine_service()); + return obs->event_timestamp > obs->last_timestamp && + (!has_min_period || timestamp > obs->last_timestamp + + MSEC_PER_SEC * obs->min_period_sec); +} + +static bool automatic_notify_is_due(const struct observe_node *obs, + const int64_t timestamp) +{ + const bool has_max_period = obs->max_period_sec != 0; + + return has_max_period && (timestamp > obs->last_timestamp + + MSEC_PER_SEC * obs->max_period_sec); +} + +static void check_notifications(sys_slist_t *observer_list, + const int64_t timestamp) +{ + struct observe_node *obs; + int rc; + bool manual_notify, automatic_notify; + + SYS_SLIST_FOR_EACH_CONTAINER(observer_list, obs, node) { + manual_notify = manual_notify_is_due(obs, timestamp); + automatic_notify = automatic_notify_is_due(obs, timestamp); + if (!manual_notify && !automatic_notify) { continue; } + rc = generate_notify_message(obs, manual_notify); + if (rc == -ENOMEM) { + /* no memory/messages available, retry later */ + return; + } + obs->last_timestamp = timestamp; + if (!rc) { + /* create at most one notification */ + return; + } + } +} + +static int socket_recv_message(struct lwm2m_ctx *client_ctx) +{ + static uint8_t in_buf[NET_IPV6_MTU]; + socklen_t from_addr_len; + ssize_t len; + static struct sockaddr from_addr; + + from_addr_len = sizeof(from_addr); + len = recvfrom(client_ctx->sock_fd, in_buf, sizeof(in_buf) - 1, + 0, &from_addr, &from_addr_len); + + if (len < 0) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + return -errno; + } + + LOG_ERR("Error reading response: %d", errno); + if (client_ctx->fault_cb != NULL) { + client_ctx->fault_cb(errno); + } + return -errno; + } + + if (len == 0) { + LOG_ERR("Zero length recv"); + return 0; + } + + in_buf[len] = 0U; + lwm2m_udp_receive(client_ctx, in_buf, len, &from_addr, handle_request); + + return 0; +} + +static int socket_send_message(struct lwm2m_ctx *client_ctx) +{ + sys_snode_t *msg_node = sys_slist_get(&client_ctx->pending_sends); + struct lwm2m_message *msg; + + if (!msg_node) { + return 0; + } + msg = SYS_SLIST_CONTAINER(msg_node, msg, node); + return lwm2m_send_message(msg); +} + +static void socket_reset_pollfd_events(void) +{ + for (int i = 0; i < sock_nfds; ++i) { + sock_fds[i].events = POLLIN + | (sys_slist_is_empty(&sock_ctx[i]->pending_sends) ? 0 : POLLOUT); + sock_fds[i].revents = 0; + } +} + +/* LwM2M main work loop */ +static void socket_loop(void) +{ + int i, rc; + int64_t timestamp; + int32_t timeout, next_retransmit; + + while (1) { + timestamp = k_uptime_get(); + timeout = lwm2m_engine_service(timestamp); + + /* wait for sockets */ + if (sock_nfds < 1) { + k_msleep(timeout); + continue; + } + + next_retransmit = check_retransmissions(timestamp); + if (next_retransmit < timeout) { + timeout = next_retransmit; + } + check_notifications(&engine_observer_list, timestamp); + socket_reset_pollfd_events(); /* * FIXME: Currently we timeout and restart poll in case fds * were modified. */ - if (poll(sock_fds, sock_nfds, lwm2m_engine_service()) < 0) { + rc = poll(sock_fds, sock_nfds, timeout); + if (rc < 0) { LOG_ERR("Error in poll:%d", errno); errno = 0; k_msleep(ENGINE_UPDATE_INTERVAL_MS); @@ -4669,35 +4697,18 @@ static void socket_receive_loop(void) continue; } - if (!(sock_fds[i].revents & POLLIN) || - sock_ctx[i] == NULL) { - sock_fds[i].revents = 0; - continue; - } - - from_addr_len = sizeof(from_addr); - sock_fds[i].revents = 0; - len = recvfrom(sock_ctx[i]->sock_fd, in_buf, - sizeof(in_buf) - 1, 0, - &from_addr, &from_addr_len); - - if (len < 0) { - LOG_ERR("Error reading response: %d", errno); - if (sock_ctx[i]->fault_cb != NULL) { - sock_ctx[i]->fault_cb(errno); + if (sock_fds[i].revents & POLLIN) { + while (sock_ctx[i]) { + rc = socket_recv_message(sock_ctx[i]); + if (rc) { + break; + } } - continue; } - if (len == 0) { - LOG_ERR("Zero length recv"); - continue; + if (sock_fds[i].revents & POLLOUT) { + socket_send_message(sock_ctx[i]); } - - in_buf[len] = 0U; - - lwm2m_udp_receive(sock_ctx[i], in_buf, len, &from_addr, - handle_request); } } } @@ -4737,6 +4748,7 @@ static int load_tls_credential(struct lwm2m_ctx *client_ctx, uint16_t res_id, int lwm2m_socket_start(struct lwm2m_ctx *client_ctx) { + int flags; #if defined(CONFIG_LWM2M_DTLS_SUPPORT) int ret; @@ -4796,6 +4808,12 @@ int lwm2m_socket_start(struct lwm2m_ctx *client_ctx) return -errno; } + flags = fcntl(client_ctx->sock_fd, F_GETFL, 0); + if (flags == -1) { + return -errno; + } + fcntl(client_ctx->sock_fd, F_SETFL, flags | O_NONBLOCK); + return lwm2m_socket_add(client_ctx); } @@ -4934,22 +4952,17 @@ int lwm2m_engine_start(struct lwm2m_ctx *client_ctx) static int lwm2m_engine_init(const struct device *dev) { - int ret = 0; - (void)memset(block1_contexts, 0, sizeof(block1_contexts)); /* start sock receive thread */ - k_thread_create(&engine_thread_data, - &engine_thread_stack[0], + k_thread_create(&engine_thread_data, &engine_thread_stack[0], K_KERNEL_STACK_SIZEOF(engine_thread_stack), - (k_thread_entry_t) socket_receive_loop, - NULL, NULL, NULL, - THREAD_PRIORITY, - 0, K_NO_WAIT); + (k_thread_entry_t)socket_loop, NULL, NULL, NULL, + THREAD_PRIORITY, 0, K_NO_WAIT); k_thread_name_set(&engine_thread_data, "lwm2m-sock-recv"); LOG_DBG("LWM2M engine socket receive thread started"); - return ret; + return 0; } SYS_INIT(lwm2m_engine_init, APPLICATION, CONFIG_KERNEL_INIT_PRIORITY_DEFAULT); diff --git a/subsys/net/lib/lwm2m/lwm2m_engine.h b/subsys/net/lib/lwm2m/lwm2m_engine.h index 81e31c4b082..fd65f964284 100644 --- a/subsys/net/lib/lwm2m/lwm2m_engine.h +++ b/subsys/net/lib/lwm2m/lwm2m_engine.h @@ -91,7 +91,7 @@ int lwm2m_put_message_buf(uint8_t *buf); struct lwm2m_message *lwm2m_get_message(struct lwm2m_ctx *client_ctx); void lwm2m_reset_message(struct lwm2m_message *msg, bool release); int lwm2m_init_message(struct lwm2m_message *msg); -int lwm2m_send_message(struct lwm2m_message *msg); +int lwm2m_send_message_async(struct lwm2m_message *msg); int lwm2m_send_empty_ack(struct lwm2m_ctx *client_ctx, uint16_t mid); int lwm2m_register_payload_handler(struct lwm2m_message *msg); diff --git a/subsys/net/lib/lwm2m/lwm2m_obj_firmware_pull.c b/subsys/net/lib/lwm2m/lwm2m_obj_firmware_pull.c index a104c844463..db32f5f0883 100644 --- a/subsys/net/lib/lwm2m/lwm2m_obj_firmware_pull.c +++ b/subsys/net/lib/lwm2m/lwm2m_obj_firmware_pull.c @@ -182,7 +182,7 @@ static int transfer_request(struct coap_block_context *ctx, #endif /* send request */ - ret = lwm2m_send_message(msg); + ret = lwm2m_send_message_async(msg); if (ret < 0) { LOG_ERR("Error sending LWM2M packet (err:%d).", ret); goto cleanup; diff --git a/subsys/net/lib/lwm2m/lwm2m_object.h b/subsys/net/lib/lwm2m/lwm2m_object.h index b2dd7282139..eb94b68932e 100644 --- a/subsys/net/lib/lwm2m/lwm2m_object.h +++ b/subsys/net/lib/lwm2m/lwm2m_object.h @@ -444,6 +444,8 @@ typedef void (*lwm2m_message_timeout_cb_t)(struct lwm2m_message *msg); /* Internal LwM2M message structure to track in-flight messages. */ struct lwm2m_message { + sys_snode_t node; + /** LwM2M context related to this message */ struct lwm2m_ctx *ctx; @@ -476,9 +478,6 @@ struct lwm2m_message { /** Incoming message action */ uint8_t operation; - /** Counter for message re-send / abort handling */ - uint8_t send_attempts; - /* Information whether the message was acknowledged. */ bool acknowledged : 1; }; diff --git a/subsys/net/lib/lwm2m/lwm2m_rd_client.c b/subsys/net/lib/lwm2m/lwm2m_rd_client.c index 85e53b48934..df5abb0466e 100644 --- a/subsys/net/lib/lwm2m/lwm2m_rd_client.c +++ b/subsys/net/lib/lwm2m/lwm2m_rd_client.c @@ -779,7 +779,7 @@ static int sm_send_registration(bool send_obj_support_data, } } - ret = lwm2m_send_message(msg); + ret = lwm2m_send_message_async(msg); if (ret < 0) { LOG_ERR("Error sending LWM2M packet (err:%d).", ret); goto cleanup; @@ -929,7 +929,7 @@ static int sm_do_deregister(void) LOG_INF("Deregister from '%s'", log_strdup(client.server_ep)); - ret = lwm2m_send_message(msg); + ret = lwm2m_send_message_async(msg); if (ret < 0) { LOG_ERR("Error sending LWM2M packet (err:%d).", ret); goto cleanup;